This is an automated email from the ASF dual-hosted git repository.
hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9365210 [IOTDB-1077] [C++ Client] Add insertRecordsOfOneDevice()
interface for C++ client (#2513)
9365210 is described below
commit 93652105543f99381dee3d3e24e0fb41918a3c6f
Author: wshao08 <[email protected]>
AuthorDate: Fri Jan 29 14:45:07 2021 +0800
[IOTDB-1077] [C++ Client] Add insertRecordsOfOneDevice() interface for C++
client (#2513)
---
client-cpp/src/main/CMakeLists.txt | 1 +
client-cpp/src/main/Session.cpp | 173 ++++++++++++++++++++++++++++++++--
client-cpp/src/main/Session.h | 18 +++-
client-cpp/src/test/cpp/sessionIT.cpp | 133 ++++++++++++++++++++++----
4 files changed, 299 insertions(+), 26 deletions(-)
diff --git a/client-cpp/src/main/CMakeLists.txt
b/client-cpp/src/main/CMakeLists.txt
index 27c359a..6e4e468 100644
--- a/client-cpp/src/main/CMakeLists.txt
+++ b/client-cpp/src/main/CMakeLists.txt
@@ -21,6 +21,7 @@ PROJECT(iotdb_session CXX)
SET(CMAKE_CXX_STANDARD 11)
SET(CMAKE_CXX_STANDARD_REQUIRED ON)
SET(CMAKE_POSITION_INDEPENDENT_CODE ON)
+SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1y")
SET(TOOLS_DIR "${CMAKE_SOURCE_DIR}/../../../../compile-tools")
FIND_PACKAGE(Boost REQUIRED)
diff --git a/client-cpp/src/main/Session.cpp b/client-cpp/src/main/Session.cpp
index a308e1c..176af63 100644
--- a/client-cpp/src/main/Session.cpp
+++ b/client-cpp/src/main/Session.cpp
@@ -370,6 +370,15 @@ bool Session::checkSorted(Tablet& tablet) {
return true;
}
+bool Session::checkSorted(vector<int64_t>& times) {
+ for (int i = 1; i < times.size(); i++) {
+ if (times[i] < times[i - 1]) {
+ return false;
+ }
+ }
+ return true;
+}
+
void Session::sortTablet(Tablet& tablet) {
/*
* following part of code sort the batch data by time,
@@ -408,12 +417,63 @@ void Session::sortIndexByTimestamp(int* index, int64_t*
timestamps, int length)
}
}
-vector<string> Session::sortList(vector<string>& valueList, int* index, int
indexLength) {
- vector<string> sortedValues(valueList.size());
- for (int i = 0; i < indexLength; i++) {
- sortedValues[i] = valueList[index[i]];
+/**
+ * Append value into buffer in Big Endian order to comply with IoTDB server
+ */
+void Session::appendValues(string &buffer, char* value, int size) {
+ for (int i = size - 1; i >= 0; i--) {
+ buffer.append(value + i, 1);
+ }
+}
+
+void Session::putValuesIntoBuffer(vector<TSDataType::TSDataType>& types,
vector<char*>& values, string& buf) {
+ for (int i = 0; i < values.size(); i++) {
+ int8_t typeNum = getDataTypeNumber(types[i]);
+ buf.append((char*)(&typeNum), sizeof(int8_t));
+ switch (types[i]) {
+ case TSDataType::BOOLEAN:
+ buf.append(values[i], 1);
+ break;
+ case TSDataType::INT32:
+ appendValues(buf, values[i], sizeof(int32_t));
+ break;
+ case TSDataType::INT64:
+ appendValues(buf, values[i], sizeof(int64_t));
+ break;
+ case TSDataType::FLOAT:
+ appendValues(buf, values[i], sizeof(float));
+ break;
+ case TSDataType::DOUBLE:
+ appendValues(buf, values[i], sizeof(double));
+ break;
+ case TSDataType::TEXT:
+ string str(values[i]);
+ int len = str.length();
+ appendValues(buf, (char*)(&len), sizeof(int));
+ // no need to change the byte order of string value
+ buf.append(values[i], len);
+ break;
+ }
+ }
+}
+
+int8_t Session::getDataTypeNumber(TSDataType::TSDataType type) {
+ switch (type) {
+ case TSDataType::BOOLEAN:
+ return 0;
+ case TSDataType::INT32:
+ return 1;
+ case TSDataType::INT64:
+ return 2;
+ case TSDataType::FLOAT:
+ return 3;
+ case TSDataType::DOUBLE:
+ return 4;
+ case TSDataType::TEXT:
+ return 5;
+ default:
+ return -1;
}
- return sortedValues;
}
void Session::open()
@@ -555,6 +615,26 @@ void Session::insertRecord(string deviceId, int64_t time,
vector<string>& measu
}
}
+void Session::insertRecord(string deviceId, int64_t time, vector<string>&
measurements,
+ vector<TSDataType::TSDataType>& types, vector<char*>& values)
+{
+ shared_ptr<TSInsertRecordReq> req(new TSInsertRecordReq());
+ req->__set_sessionId(sessionId);
+ req->__set_deviceId(deviceId);
+ req->__set_timestamp(time);
+ req->__set_measurements(measurements);
+ string buffer;
+ putValuesIntoBuffer(types, values, buffer);
+ req->__set_values(buffer);
+ shared_ptr<TSStatus> resp(new TSStatus());
+ try {
+ client->insertRecord(*resp,*req);
+ RpcUtils::verifySuccess(*resp);
+ } catch (IoTDBConnectionException& e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
void Session::insertRecords(vector<string>& deviceIds, vector<int64_t>& times,
vector<vector<string>>& measurementsList, vector<vector<string>>& valuesList) {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len !=
valuesList.size()) {
@@ -580,13 +660,90 @@ void Session::insertRecords(vector<string>& deviceIds,
vector<int64_t>& times, v
}
}
+void Session::insertRecords(vector<string>& deviceIds, vector<int64_t>& times,
+ vector<vector<string>>& measurementsList,
vector<vector<TSDataType::TSDataType>> typesList,
+ vector<vector<char*>>& valuesList) {
+ int len = deviceIds.size();
+ if (len != times.size() || len != measurementsList.size() || len !=
valuesList.size()) {
+ logic_error e("deviceIds, times, measurementsList and valuesList's
size should be equal");
+ throw exception(e);
+ }
+ shared_ptr<TSInsertRecordsReq> request(new TSInsertRecordsReq());
+ request->__set_sessionId(sessionId);
+ request->__set_deviceIds(deviceIds);
+ request->__set_timestamps(times);
+ request->__set_measurementsList(measurementsList);
+ vector<string> bufferList;
+ for (int i = 0; i < valuesList.size(); i++) {
+ string buffer;
+ putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
+ bufferList.push_back(buffer);
+ }
+ request->__set_valuesList(bufferList);
+
+ try {
+ shared_ptr<TSStatus> resp(new TSStatus());
+ client->insertRecords(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ } catch (IoTDBConnectionException& e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
+
+void Session::insertRecordsOfOneDevice(string deviceId, vector<int64_t>& times,
+ vector<vector<string>> measurementsList,
vector<vector<TSDataType::TSDataType>> typesList,
+ vector<vector<char*>>& valuesList) {
+ insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList,
valuesList, false);
+}
+
+void Session::insertRecordsOfOneDevice(string deviceId, vector<int64_t>& times,
+ vector<vector<string>> measurementsList,
vector<vector<TSDataType::TSDataType>> typesList,
+ vector<vector<char*>>& valuesList, bool sorted) {
+
+ if (sorted) {
+ if (!checkSorted(times)) {
+ throw BatchExecutionException("Times in InsertOneDeviceRecords are
not in ascending order");
+ }
+ } else {
+ int* index = new int[times.size()];
+ for (int i = 0; i < times.size(); i++) {
+ index[i] = i;
+ }
+
+ this->sortIndexByTimestamp(index, times.data(), times.size());
+ sort(times.begin(), times.end());
+ measurementsList = sortList(measurementsList, index, times.size());
+ typesList = sortList(typesList, index, times.size());
+ valuesList = sortList(valuesList, index, times.size());
+ }
+ unique_ptr<TSInsertRecordsOfOneDeviceReq> request(new
TSInsertRecordsOfOneDeviceReq());
+ request->__set_sessionId(sessionId);
+ request->__set_deviceId(deviceId);
+ request->__set_timestamps(times);
+ request->__set_measurementsList(measurementsList);
+ vector<string> bufferList;
+ for (int i = 0; i < valuesList.size(); i++) {
+ string buffer;
+ putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
+ bufferList.push_back(buffer);
+ }
+ request->__set_valuesList(bufferList);
+
+ try {
+ unique_ptr<TSStatus> resp(new TSStatus());
+ client->insertRecordsOfOneDevice(*resp, *request);
+ RpcUtils::verifySuccess(*resp);
+ } catch (const exception& e) {
+ throw IoTDBConnectionException(e.what());
+ }
+}
void Session::insertTablet(Tablet& tablet) {
try
{
insertTablet(tablet, false);
}
- catch (const std::exception& e)
+ catch (const exception& e)
{
logic_error error(e.what());
throw exception(error);
@@ -630,7 +787,7 @@ void Session::insertTablets(map<string, Tablet*>& tablets) {
{
insertTablets(tablets, false);
}
- catch (const std::exception& e)
+ catch (const exception& e)
{
logic_error error(e.what());
throw exception(error);
@@ -669,7 +826,7 @@ void Session::insertTablets(map<string, Tablet*>& tablets,
bool sorted) {
client->insertTablets(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
- catch (const std::exception& e)
+ catch (const exception& e)
{
throw IoTDBConnectionException(e.what());
}
diff --git a/client-cpp/src/main/Session.h b/client-cpp/src/main/Session.h
index 41a72f0..45ee51c 100644
--- a/client-cpp/src/main/Session.h
+++ b/client-cpp/src/main/Session.h
@@ -559,6 +559,14 @@ public:
void closeOperationHandle();
};
+template<typename T>
+std::vector<T> sortList(std::vector<T>& valueList, int* index, int
indexLength) {
+ std::vector<T> sortedValues(valueList.size());
+ for (int i = 0; i < indexLength; i++) {
+ sortedValues[i] = valueList[index[i]];
+ }
+ return sortedValues;
+}
class Session
{
@@ -579,11 +587,14 @@ class Session
const static int DEFAULT_TIMEOUT_MS = 0;
bool checkSorted(Tablet& tablet);
+ bool checkSorted(std::vector<int64_t>& times);
void sortTablet(Tablet& tablet);
- std::vector<std::string> sortList(std::vector<std::string>& valueList,
int* index, int indexLength);
void sortIndexByTimestamp(int* index, int64_t* timestamps, int length);
std::string getTimeZone();
void setTimeZone(std::string zoneId);
+ void appendValues(std::string &buffer, char* value, int size);
+ void putValuesIntoBuffer(std::vector<TSDataType::TSDataType>& types,
std::vector<char*>& values, std::string& buf);
+ int8_t getDataTypeNumber(TSDataType::TSDataType type);
public:
Session(std::string host, int rpcPort) : username("user"),
password("password") {
this->host = host;
@@ -623,8 +634,11 @@ class Session
void open(bool enableRPCCompression, int connectionTimeoutInMs);
void close();
void insertRecord(std::string deviceId, int64_t time,
std::vector<std::string>& measurements, std::vector<std::string>& values);
- //void insertRecord(std::string deviceId, int64_t time,
std::vector<std::string>& measurements, std::vector<TSDataType::Type>& types, )
+ void insertRecord(std::string deviceId, int64_t time,
std::vector<std::string>& measurements, std::vector<TSDataType::TSDataType>&
types, std::vector<char*>& values);
void insertRecords(std::vector<std::string>& deviceIds,
std::vector<int64_t>& times, std::vector<std::vector<std::string>>&
measurementsList, std::vector<std::vector<std::string>>& valuesList);
+ void insertRecords(std::vector<std::string>& deviceIds,
std::vector<int64_t>& times, std::vector<std::vector<std::string>>&
measurementsList, std::vector<std::vector<TSDataType::TSDataType>> typesList,
std::vector<std::vector<char*>>& valuesList);
+ void insertRecordsOfOneDevice(std::string deviceId,
std::vector<int64_t>& times, std::vector<std::vector<std::string>>
measurementsList, std::vector<std::vector<TSDataType::TSDataType>> typesList,
std::vector<std::vector<char*>>& valuesList);
+ void insertRecordsOfOneDevice(std::string deviceId,
std::vector<int64_t>& times, std::vector<std::vector<std::string>>
measurementsList, std::vector<std::vector<TSDataType::TSDataType>> typesList,
std::vector<std::vector<char*>>& valuesList, bool sorted);
void insertTablet(Tablet& tablet);
void insertTablet(Tablet& tablet, bool sorted);
void insertTablets(std::map<std::string, Tablet*>& tablets);
diff --git a/client-cpp/src/test/cpp/sessionIT.cpp
b/client-cpp/src/test/cpp/sessionIT.cpp
index d38b693..5d724c2 100644
--- a/client-cpp/src/test/cpp/sessionIT.cpp
+++ b/client-cpp/src/test/cpp/sessionIT.cpp
@@ -55,16 +55,10 @@ TEST_CASE( "Delete timeseries success",
"[deleteTimeseries]" ) {
TEST_CASE( "Test insertRecord by string", "[testInsertRecord]") {
prepareTimeseries();
string deviceId = "root.test.d1";
- vector<string> measurements;
- measurements.push_back("s1");
- measurements.push_back("s2");
- measurements.push_back("s3");
+ vector<string> measurements = { "s1", "s2", "s3" };
for (long time = 0; time < 100; time++) {
- vector<string> values;
- values.push_back("1");
- values.push_back("2");
- values.push_back("3");
+ vector<string> values = { "1", "2", "3" };
session->insertRecord(deviceId, time, measurements, values);
}
@@ -87,20 +81,14 @@ TEST_CASE( "Test insertRecord by string",
"[testInsertRecord]") {
TEST_CASE( "Test insertRecords ", "[testInsertRecords]") {
prepareTimeseries();
string deviceId = "root.test.d1";
- vector<string> measurements;
- measurements.push_back("s1");
- measurements.push_back("s2");
- measurements.push_back("s3");
+ vector<string> measurements = { "s1", "s2", "s3" };
vector<string> deviceIds;
vector<vector<string>> measurementsList;
vector<vector<string>> valuesList;
vector<int64_t> timestamps;
for (int64_t time = 0; time < 500; time++) {
- vector<string> values;
- values.push_back("1");
- values.push_back("2");
- values.push_back("3");
+ vector<string> values = { "1", "2", "3" };
deviceIds.push_back(deviceId);
measurementsList.push_back(measurements);
@@ -131,6 +119,119 @@ TEST_CASE( "Test insertRecords ", "[testInsertRecords]") {
REQUIRE( count == 500 );
}
+TEST_CASE( "Test insertRecord with types ", "[testTypedInsertRecord]") {
+ vector<string> timeseries =
{"root.test.d1.s1","root.test.d1.s2","root.test.d1.s3"};
+ vector<TSDataType::TSDataType> types = { TSDataType::INT32,
TSDataType::DOUBLE, TSDataType::INT64 };
+
+ for (int i = 0; i < timeseries.size(); i++) {
+ if (session->checkTimeseriesExists(timeseries[i])) {
+ session->deleteTimeseries(timeseries[i]);
+ }
+ session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE,
CompressionType::SNAPPY);
+ }
+ string deviceId = "root.test.d1";
+ vector<string> measurements = { "s1", "s2", "s3" };
+ vector<int32_t> value1(100, 1);
+ vector<double> value2(100, 2.2);
+ vector<int64_t> value3(100, 3);
+
+ for (long time = 0; time < 100; time++) {
+ vector<char *> values = { (char *)(&value1[time]), (char
*)(&value2[time]), (char *)(&value3[time]) };
+ session->insertRecord(deviceId, time, measurements, types, values);
+ }
+
+ SessionDataSet *sessionDataSet = session->executeQueryStatement("select
s1,s2,s3 from root.test.d1");
+ sessionDataSet->setBatchSize(1024);
+ long count = 0;
+ while (sessionDataSet->hasNext()) {
+ sessionDataSet->next();
+ count++;
+ }
+ REQUIRE( count == 100 );
+}
+
+TEST_CASE( "Test insertRecords with types ", "[testTypedInsertRecords]") {
+ vector<string> timeseries =
{"root.test.d1.s1","root.test.d1.s2","root.test.d1.s3"};
+ vector<TSDataType::TSDataType> types = { TSDataType::INT32,
TSDataType::DOUBLE, TSDataType::INT64 };
+
+ for (int i = 0; i < timeseries.size(); i++) {
+ if (session->checkTimeseriesExists(timeseries[i])) {
+ session->deleteTimeseries(timeseries[i]);
+ }
+ session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE,
CompressionType::SNAPPY);
+ }
+ string deviceId = "root.test.d1";
+ vector<string> measurements = { "s1", "s2", "s3" };
+ vector<string> deviceIds;
+ vector<vector<string>> measurementsList;
+ vector<vector<TSDataType::TSDataType>> typesList;
+ vector<vector<char*>> valuesList;
+ vector<int64_t> timestamps;
+ vector<int32_t> value1(100, 1);
+ vector<double> value2(100, 2.2);
+ vector<int64_t> value3(100, 3);
+
+ for (int64_t time = 0; time < 100; time++) {
+ vector<char *> values = { (char *)(&value1[time]), (char
*)(&value2[time]), (char *)(&value3[time]) };
+ deviceIds.push_back(deviceId);
+ measurementsList.push_back(measurements);
+ typesList.push_back(types);
+ valuesList.push_back(values);
+ timestamps.push_back(time);
+ }
+
+ session->insertRecords(deviceIds, timestamps, measurementsList, typesList,
valuesList);
+
+ SessionDataSet *sessionDataSet = session->executeQueryStatement("select *
from root.test.d1");
+ sessionDataSet->setBatchSize(1024);
+ int count = 0;
+ while (sessionDataSet->hasNext()) {
+ sessionDataSet->next();
+ count++;
+ }
+ REQUIRE( count == 100 );
+}
+
+TEST_CASE( "Test insertRecordsOfOneDevice", "[testInsertRecordsOfOneDevice]") {
+ vector<string> timeseries =
{"root.test.d1.s1","root.test.d1.s2","root.test.d1.s3"};
+ vector<TSDataType::TSDataType> types = { TSDataType::INT32,
TSDataType::DOUBLE, TSDataType::INT64 };
+
+ for (int i = 0; i < timeseries.size(); i++) {
+ if (session->checkTimeseriesExists(timeseries[i])) {
+ session->deleteTimeseries(timeseries[i]);
+ }
+ session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE,
CompressionType::SNAPPY);
+ }
+ string deviceId = "root.test.d1";
+ vector<string> measurements = { "s1", "s2", "s3" };
+ vector<vector<string>> measurementsList;
+ vector<vector<TSDataType::TSDataType>> typesList;
+ vector<vector<char*>> valuesList;
+ vector<int64_t> timestamps;
+ vector<int32_t> value1(100, 1);
+ vector<double> value2(100, 2.2);
+ vector<int64_t> value3(100, 3);
+
+ for (int64_t time = 0; time < 100; time++) {
+ vector<char *> values = { (char *)(&value1[time]), (char
*)(&value2[time]), (char *)(&value3[time]) };
+ measurementsList.push_back(measurements);
+ typesList.push_back(types);
+ valuesList.push_back(values);
+ timestamps.push_back(time);
+ }
+
+ session->insertRecordsOfOneDevice(deviceId, timestamps, measurementsList,
typesList, valuesList);
+
+ SessionDataSet *sessionDataSet = session->executeQueryStatement("select *
from root.test.d1");
+ sessionDataSet->setBatchSize(1024);
+ int count = 0;
+ while (sessionDataSet->hasNext()) {
+ sessionDataSet->next();
+ count++;
+ }
+ REQUIRE( count == 100 );
+}
+
TEST_CASE( "Test insertTablet ", "[testInsertTablet]") {
prepareTimeseries();
string deviceId = "root.test.d1";