This is an automated email from the ASF dual-hosted git repository.

hxd pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9365210  [IOTDB-1077] [C++ Client] Add insertRecordsOfOneDevice() 
interface for C++ client (#2513)
9365210 is described below

commit 93652105543f99381dee3d3e24e0fb41918a3c6f
Author: wshao08 <[email protected]>
AuthorDate: Fri Jan 29 14:45:07 2021 +0800

    [IOTDB-1077] [C++ Client] Add insertRecordsOfOneDevice() interface for C++ 
client (#2513)
---
 client-cpp/src/main/CMakeLists.txt    |   1 +
 client-cpp/src/main/Session.cpp       | 173 ++++++++++++++++++++++++++++++++--
 client-cpp/src/main/Session.h         |  18 +++-
 client-cpp/src/test/cpp/sessionIT.cpp | 133 ++++++++++++++++++++++----
 4 files changed, 299 insertions(+), 26 deletions(-)

diff --git a/client-cpp/src/main/CMakeLists.txt 
b/client-cpp/src/main/CMakeLists.txt
index 27c359a..6e4e468 100644
--- a/client-cpp/src/main/CMakeLists.txt
+++ b/client-cpp/src/main/CMakeLists.txt
@@ -21,6 +21,7 @@ PROJECT(iotdb_session CXX)
 SET(CMAKE_CXX_STANDARD 11)
 SET(CMAKE_CXX_STANDARD_REQUIRED ON)
 SET(CMAKE_POSITION_INDEPENDENT_CODE ON)
+SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++1y")
 SET(TOOLS_DIR "${CMAKE_SOURCE_DIR}/../../../../compile-tools")
 
 FIND_PACKAGE(Boost REQUIRED)
diff --git a/client-cpp/src/main/Session.cpp b/client-cpp/src/main/Session.cpp
index a308e1c..176af63 100644
--- a/client-cpp/src/main/Session.cpp
+++ b/client-cpp/src/main/Session.cpp
@@ -370,6 +370,15 @@ bool Session::checkSorted(Tablet& tablet) {
     return true;
 }
 
+bool Session::checkSorted(vector<int64_t>& times) {
+    for (int i = 1; i < times.size(); i++) {
+        if (times[i] < times[i - 1]) {
+            return false;
+        }
+    }
+    return true;
+}
+
 void Session::sortTablet(Tablet& tablet) {
     /*
      * following part of code sort the batch data by time,
@@ -408,12 +417,63 @@ void Session::sortIndexByTimestamp(int* index, int64_t* 
timestamps, int length)
     }
 }
 
-vector<string> Session::sortList(vector<string>& valueList, int* index, int 
indexLength) {
-    vector<string> sortedValues(valueList.size());
-    for (int i = 0; i < indexLength; i++) {
-        sortedValues[i] = valueList[index[i]];
+/**
+ * Append value into buffer in Big Endian order to comply with IoTDB server
+ */
+void Session::appendValues(string &buffer, char* value, int size) {
+    for (int i = size - 1; i >= 0; i--) {
+        buffer.append(value + i, 1);
+    }
+}
+
+void Session::putValuesIntoBuffer(vector<TSDataType::TSDataType>& types, 
vector<char*>& values, string& buf) {
+    for (int i = 0; i < values.size(); i++) {
+        int8_t typeNum = getDataTypeNumber(types[i]);
+        buf.append((char*)(&typeNum), sizeof(int8_t));
+        switch (types[i]) {
+            case TSDataType::BOOLEAN:
+                buf.append(values[i], 1);
+                break;
+            case TSDataType::INT32:
+                appendValues(buf, values[i], sizeof(int32_t));
+                break;
+            case TSDataType::INT64:
+                appendValues(buf, values[i], sizeof(int64_t));
+                break;
+            case TSDataType::FLOAT:
+                appendValues(buf, values[i], sizeof(float));
+                break;
+            case TSDataType::DOUBLE:
+                appendValues(buf, values[i], sizeof(double));
+                break;
+            case TSDataType::TEXT:
+                string str(values[i]);
+                int len = str.length();
+                appendValues(buf, (char*)(&len), sizeof(int));
+                // no need to change the byte order of string value
+                buf.append(values[i], len);
+                break;
+        }
+    }
+}
+
+int8_t Session::getDataTypeNumber(TSDataType::TSDataType type) {
+    switch (type) {
+        case TSDataType::BOOLEAN:
+            return 0;
+        case TSDataType::INT32:
+            return 1;
+        case TSDataType::INT64:
+            return 2;
+        case TSDataType::FLOAT:
+            return 3;
+        case TSDataType::DOUBLE:
+            return 4;
+        case TSDataType::TEXT:
+            return 5;
+        default:
+            return -1;
     }
-    return sortedValues;
 }
 
 void Session::open()
@@ -555,6 +615,26 @@ void Session::insertRecord(string deviceId,  int64_t time, 
vector<string>& measu
     }
 }
 
+void Session::insertRecord(string deviceId,  int64_t time, vector<string>& 
measurements,
+    vector<TSDataType::TSDataType>& types, vector<char*>& values)
+{
+    shared_ptr<TSInsertRecordReq> req(new TSInsertRecordReq());
+    req->__set_sessionId(sessionId);
+    req->__set_deviceId(deviceId);
+    req->__set_timestamp(time);
+    req->__set_measurements(measurements);
+    string buffer;
+    putValuesIntoBuffer(types, values, buffer);
+    req->__set_values(buffer);
+    shared_ptr<TSStatus> resp(new TSStatus());
+    try {
+        client->insertRecord(*resp,*req);
+        RpcUtils::verifySuccess(*resp);
+    } catch (IoTDBConnectionException& e) {
+        throw IoTDBConnectionException(e.what());
+    }
+}
+
 void Session::insertRecords(vector<string>& deviceIds, vector<int64_t>& times, 
vector<vector<string>>& measurementsList, vector<vector<string>>& valuesList) {
     int len = deviceIds.size();
     if (len != times.size() || len != measurementsList.size() || len != 
valuesList.size()) {
@@ -580,13 +660,90 @@ void Session::insertRecords(vector<string>& deviceIds, 
vector<int64_t>& times, v
     }
 }
 
+void Session::insertRecords(vector<string>& deviceIds, vector<int64_t>& times,
+    vector<vector<string>>& measurementsList, 
vector<vector<TSDataType::TSDataType>> typesList,
+    vector<vector<char*>>& valuesList) {
+    int len = deviceIds.size();
+    if (len != times.size() || len != measurementsList.size() || len != 
valuesList.size()) {
+        logic_error e("deviceIds, times, measurementsList and valuesList's 
size should be equal");
+        throw exception(e);
+    }
+    shared_ptr<TSInsertRecordsReq> request(new TSInsertRecordsReq());
+    request->__set_sessionId(sessionId);
+    request->__set_deviceIds(deviceIds);
+    request->__set_timestamps(times);
+    request->__set_measurementsList(measurementsList);
+    vector<string> bufferList;
+    for (int i = 0; i < valuesList.size(); i++) {
+        string buffer;
+        putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
+        bufferList.push_back(buffer);
+    }
+    request->__set_valuesList(bufferList);
+
+    try {
+        shared_ptr<TSStatus> resp(new TSStatus());
+        client->insertRecords(*resp, *request);
+        RpcUtils::verifySuccess(*resp);
+    } catch (IoTDBConnectionException& e) {
+        throw IoTDBConnectionException(e.what());
+    }
+}
+
+void Session::insertRecordsOfOneDevice(string deviceId, vector<int64_t>& times,
+    vector<vector<string>> measurementsList, 
vector<vector<TSDataType::TSDataType>> typesList,
+    vector<vector<char*>>& valuesList) {
+    insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, 
valuesList, false);
+}
+
+void Session::insertRecordsOfOneDevice(string deviceId, vector<int64_t>& times,
+    vector<vector<string>> measurementsList, 
vector<vector<TSDataType::TSDataType>> typesList,
+    vector<vector<char*>>& valuesList, bool sorted) {
+
+    if (sorted) {
+        if (!checkSorted(times)) {
+            throw BatchExecutionException("Times in InsertOneDeviceRecords are 
not in ascending order");
+        }
+    } else {
+        int* index = new int[times.size()];
+        for (int i = 0; i < times.size(); i++) {
+            index[i] = i;
+        }
+
+        this->sortIndexByTimestamp(index, times.data(), times.size());
+        sort(times.begin(), times.end());
+        measurementsList = sortList(measurementsList, index, times.size());
+        typesList = sortList(typesList, index, times.size());
+        valuesList = sortList(valuesList, index, times.size());
+    }
+    unique_ptr<TSInsertRecordsOfOneDeviceReq> request(new 
TSInsertRecordsOfOneDeviceReq());
+    request->__set_sessionId(sessionId);
+    request->__set_deviceId(deviceId);
+    request->__set_timestamps(times);
+    request->__set_measurementsList(measurementsList);
+    vector<string> bufferList;
+    for (int i = 0; i < valuesList.size(); i++) {
+        string buffer;
+        putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
+        bufferList.push_back(buffer);
+    }
+    request->__set_valuesList(bufferList);
+
+    try {
+        unique_ptr<TSStatus> resp(new TSStatus());
+        client->insertRecordsOfOneDevice(*resp, *request);
+        RpcUtils::verifySuccess(*resp);
+    } catch (const exception& e) {
+        throw IoTDBConnectionException(e.what());
+    }
+}
 
 void Session::insertTablet(Tablet& tablet) {
     try
     {
         insertTablet(tablet, false);
     }
-    catch (const std::exception& e)
+    catch (const exception& e)
     {
         logic_error error(e.what());
         throw exception(error);
@@ -630,7 +787,7 @@ void Session::insertTablets(map<string, Tablet*>& tablets) {
     {
         insertTablets(tablets, false);
     }
-    catch (const std::exception& e)
+    catch (const exception& e)
     {
         logic_error error(e.what());
         throw exception(error);
@@ -669,7 +826,7 @@ void Session::insertTablets(map<string, Tablet*>& tablets, 
bool sorted) {
             client->insertTablets(*resp, *request);
             RpcUtils::verifySuccess(*resp);
         }
-        catch (const std::exception& e)
+        catch (const exception& e)
         {
             throw IoTDBConnectionException(e.what());
         }
diff --git a/client-cpp/src/main/Session.h b/client-cpp/src/main/Session.h
index 41a72f0..45ee51c 100644
--- a/client-cpp/src/main/Session.h
+++ b/client-cpp/src/main/Session.h
@@ -559,6 +559,14 @@ public:
     void closeOperationHandle();
 };
 
+template<typename T>
+std::vector<T> sortList(std::vector<T>& valueList, int* index, int 
indexLength) {
+    std::vector<T> sortedValues(valueList.size());
+    for (int i = 0; i < indexLength; i++) {
+        sortedValues[i] = valueList[index[i]];
+    }
+    return sortedValues;
+}
 
 class Session
 {
@@ -579,11 +587,14 @@ class Session
         const static int DEFAULT_TIMEOUT_MS = 0;
 
         bool checkSorted(Tablet& tablet);
+        bool checkSorted(std::vector<int64_t>& times);
         void sortTablet(Tablet& tablet);
-        std::vector<std::string> sortList(std::vector<std::string>& valueList, 
int* index, int indexLength);
         void sortIndexByTimestamp(int* index, int64_t* timestamps, int length);
         std::string getTimeZone();
         void setTimeZone(std::string zoneId);
+        void appendValues(std::string &buffer, char* value, int size);
+        void putValuesIntoBuffer(std::vector<TSDataType::TSDataType>& types, 
std::vector<char*>& values, std::string& buf);
+        int8_t getDataTypeNumber(TSDataType::TSDataType type);
     public:
         Session(std::string host, int rpcPort) : username("user"), 
password("password") {
             this->host = host;
@@ -623,8 +634,11 @@ class Session
         void open(bool enableRPCCompression, int connectionTimeoutInMs);
         void close();
         void insertRecord(std::string deviceId, int64_t time, 
std::vector<std::string>& measurements, std::vector<std::string>& values);
-        //void insertRecord(std::string deviceId, int64_t time, 
std::vector<std::string>& measurements, std::vector<TSDataType::Type>& types, )
+        void insertRecord(std::string deviceId, int64_t time, 
std::vector<std::string>& measurements, std::vector<TSDataType::TSDataType>& 
types, std::vector<char*>& values);
         void insertRecords(std::vector<std::string>& deviceIds, 
std::vector<int64_t>& times, std::vector<std::vector<std::string>>& 
measurementsList, std::vector<std::vector<std::string>>& valuesList);
+        void insertRecords(std::vector<std::string>& deviceIds, 
std::vector<int64_t>& times, std::vector<std::vector<std::string>>& 
measurementsList, std::vector<std::vector<TSDataType::TSDataType>> typesList, 
std::vector<std::vector<char*>>& valuesList);
+        void insertRecordsOfOneDevice(std::string deviceId, 
std::vector<int64_t>& times, std::vector<std::vector<std::string>> 
measurementsList, std::vector<std::vector<TSDataType::TSDataType>> typesList, 
std::vector<std::vector<char*>>& valuesList);
+        void insertRecordsOfOneDevice(std::string deviceId, 
std::vector<int64_t>& times, std::vector<std::vector<std::string>> 
measurementsList, std::vector<std::vector<TSDataType::TSDataType>> typesList, 
std::vector<std::vector<char*>>& valuesList, bool sorted);
         void insertTablet(Tablet& tablet);
         void insertTablet(Tablet& tablet, bool sorted);
         void insertTablets(std::map<std::string, Tablet*>& tablets);
diff --git a/client-cpp/src/test/cpp/sessionIT.cpp 
b/client-cpp/src/test/cpp/sessionIT.cpp
index d38b693..5d724c2 100644
--- a/client-cpp/src/test/cpp/sessionIT.cpp
+++ b/client-cpp/src/test/cpp/sessionIT.cpp
@@ -55,16 +55,10 @@ TEST_CASE( "Delete timeseries success", 
"[deleteTimeseries]" ) {
 TEST_CASE( "Test insertRecord by string", "[testInsertRecord]") {
   prepareTimeseries();
   string deviceId = "root.test.d1";
-  vector<string> measurements;
-  measurements.push_back("s1");
-  measurements.push_back("s2");
-  measurements.push_back("s3");
+  vector<string> measurements = { "s1", "s2", "s3" };
 
   for (long time = 0; time < 100; time++) {
-    vector<string> values;
-    values.push_back("1");
-    values.push_back("2");
-    values.push_back("3");
+    vector<string> values = { "1", "2", "3" };
     session->insertRecord(deviceId, time, measurements, values);
   }
 
@@ -87,20 +81,14 @@ TEST_CASE( "Test insertRecord by string", 
"[testInsertRecord]") {
 TEST_CASE( "Test insertRecords ", "[testInsertRecords]") {
   prepareTimeseries();
   string deviceId = "root.test.d1";
-  vector<string> measurements;
-  measurements.push_back("s1");
-  measurements.push_back("s2");
-  measurements.push_back("s3");
+  vector<string> measurements = { "s1", "s2", "s3" };
   vector<string> deviceIds;
   vector<vector<string>> measurementsList;
   vector<vector<string>> valuesList;
   vector<int64_t> timestamps;
 
   for (int64_t time = 0; time < 500; time++) {
-    vector<string> values;
-    values.push_back("1");
-    values.push_back("2");
-    values.push_back("3");
+    vector<string> values = { "1", "2", "3" };
 
     deviceIds.push_back(deviceId);
     measurementsList.push_back(measurements);
@@ -131,6 +119,119 @@ TEST_CASE( "Test insertRecords ", "[testInsertRecords]") {
   REQUIRE( count == 500 );
 }
 
+TEST_CASE( "Test insertRecord with types ", "[testTypedInsertRecord]") {
+  vector<string> timeseries = 
{"root.test.d1.s1","root.test.d1.s2","root.test.d1.s3"};
+  vector<TSDataType::TSDataType> types = { TSDataType::INT32, 
TSDataType::DOUBLE, TSDataType::INT64 };
+
+  for (int i = 0; i < timeseries.size(); i++) {
+    if (session->checkTimeseriesExists(timeseries[i])) {
+      session->deleteTimeseries(timeseries[i]);
+    }
+    session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE, 
CompressionType::SNAPPY);
+  }
+  string deviceId = "root.test.d1";
+  vector<string> measurements = { "s1", "s2", "s3" };
+  vector<int32_t> value1(100, 1);
+  vector<double> value2(100, 2.2);
+  vector<int64_t> value3(100, 3);
+
+  for (long time = 0; time < 100; time++) {
+    vector<char *> values = { (char *)(&value1[time]), (char 
*)(&value2[time]), (char *)(&value3[time]) };
+    session->insertRecord(deviceId, time, measurements, types, values);
+  }
+
+  SessionDataSet *sessionDataSet = session->executeQueryStatement("select 
s1,s2,s3 from root.test.d1");
+  sessionDataSet->setBatchSize(1024);
+  long count = 0;
+  while (sessionDataSet->hasNext()) {
+    sessionDataSet->next();
+    count++;
+  }
+  REQUIRE( count == 100 );
+}
+
+TEST_CASE( "Test insertRecords with types ", "[testTypedInsertRecords]") {
+  vector<string> timeseries = 
{"root.test.d1.s1","root.test.d1.s2","root.test.d1.s3"};
+  vector<TSDataType::TSDataType> types = { TSDataType::INT32, 
TSDataType::DOUBLE, TSDataType::INT64 };
+
+  for (int i = 0; i < timeseries.size(); i++) {
+    if (session->checkTimeseriesExists(timeseries[i])) {
+      session->deleteTimeseries(timeseries[i]);
+    }
+    session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE, 
CompressionType::SNAPPY);
+  }
+  string deviceId = "root.test.d1";
+  vector<string> measurements = { "s1", "s2", "s3" };
+  vector<string> deviceIds;
+  vector<vector<string>> measurementsList;
+  vector<vector<TSDataType::TSDataType>> typesList;
+  vector<vector<char*>> valuesList;
+  vector<int64_t> timestamps;
+  vector<int32_t> value1(100, 1);
+  vector<double> value2(100, 2.2);
+  vector<int64_t> value3(100, 3);
+
+  for (int64_t time = 0; time < 100; time++) {
+    vector<char *> values = { (char *)(&value1[time]), (char 
*)(&value2[time]), (char *)(&value3[time]) };
+    deviceIds.push_back(deviceId);
+    measurementsList.push_back(measurements);
+    typesList.push_back(types);
+    valuesList.push_back(values);
+    timestamps.push_back(time);
+  }
+
+  session->insertRecords(deviceIds, timestamps, measurementsList, typesList, 
valuesList);
+
+  SessionDataSet *sessionDataSet = session->executeQueryStatement("select * 
from root.test.d1");
+  sessionDataSet->setBatchSize(1024);
+  int count = 0;
+  while (sessionDataSet->hasNext()) {
+    sessionDataSet->next();
+    count++;
+  }
+  REQUIRE( count == 100 );
+}
+
+TEST_CASE( "Test insertRecordsOfOneDevice", "[testInsertRecordsOfOneDevice]") {
+  vector<string> timeseries = 
{"root.test.d1.s1","root.test.d1.s2","root.test.d1.s3"};
+  vector<TSDataType::TSDataType> types = { TSDataType::INT32, 
TSDataType::DOUBLE, TSDataType::INT64 };
+
+  for (int i = 0; i < timeseries.size(); i++) {
+    if (session->checkTimeseriesExists(timeseries[i])) {
+      session->deleteTimeseries(timeseries[i]);
+    }
+    session->createTimeseries(timeseries[i], types[i], TSEncoding::RLE, 
CompressionType::SNAPPY);
+  }
+  string deviceId = "root.test.d1";
+  vector<string> measurements = { "s1", "s2", "s3" };
+  vector<vector<string>> measurementsList;
+  vector<vector<TSDataType::TSDataType>> typesList;
+  vector<vector<char*>> valuesList;
+  vector<int64_t> timestamps;
+  vector<int32_t> value1(100, 1);
+  vector<double> value2(100, 2.2);
+  vector<int64_t> value3(100, 3);
+
+  for (int64_t time = 0; time < 100; time++) {
+    vector<char *> values = { (char *)(&value1[time]), (char 
*)(&value2[time]), (char *)(&value3[time]) };
+    measurementsList.push_back(measurements);
+    typesList.push_back(types);
+    valuesList.push_back(values);
+    timestamps.push_back(time);
+  }
+
+  session->insertRecordsOfOneDevice(deviceId, timestamps, measurementsList, 
typesList, valuesList);
+
+  SessionDataSet *sessionDataSet = session->executeQueryStatement("select * 
from root.test.d1");
+  sessionDataSet->setBatchSize(1024);
+  int count = 0;
+  while (sessionDataSet->hasNext()) {
+    sessionDataSet->next();
+    count++;
+  }
+  REQUIRE( count == 100 );
+}
+
 TEST_CASE( "Test insertTablet ", "[testInsertTablet]") {
   prepareTimeseries();
   string deviceId = "root.test.d1";

Reply via email to