This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 443845c [IOTDB-1542] Cpp client segment fault: char[] buffer overflow
caused by long exception message (#3671)
443845c is described below
commit 443845c249fe59149ab97af4670874092dc38995
Author: Steve Yurong Su (宇荣) <[email protected]>
AuthorDate: Tue Aug 3 03:46:56 2021 -0500
[IOTDB-1542] Cpp client segment fault: char[] buffer overflow caused by
long exception message (#3671)
---
client-cpp/src/main/Session.cpp | 754 ++++++++++++++++++----------------------
client-cpp/src/main/Session.h | 563 ++++++++++++++++--------------
2 files changed, 650 insertions(+), 667 deletions(-)
diff --git a/client-cpp/src/main/Session.cpp b/client-cpp/src/main/Session.cpp
index 1ae80c3..75713b2 100644
--- a/client-cpp/src/main/Session.cpp
+++ b/client-cpp/src/main/Session.cpp
@@ -23,33 +23,30 @@ using namespace std;
TSDataType::TSDataType getTSDataTypeFromString(string str) {
// BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, NULLTYPE
- if (str == "BOOLEAN") return TSDataType::BOOLEAN;
- else if(str == "INT32") return TSDataType::INT32;
- else if(str == "INT64") return TSDataType::INT64;
- else if(str == "FLOAT") return TSDataType::FLOAT;
- else if(str == "DOUBLE") return TSDataType::DOUBLE;
- else if(str == "TEXT") return TSDataType::TEXT;
- else if(str == "NULLTYPE") return TSDataType::NULLTYPE;
+ if (str == "BOOLEAN") return TSDataType::BOOLEAN;
+ else if (str == "INT32") return TSDataType::INT32;
+ else if (str == "INT64") return TSDataType::INT64;
+ else if (str == "FLOAT") return TSDataType::FLOAT;
+ else if (str == "DOUBLE") return TSDataType::DOUBLE;
+ else if (str == "TEXT") return TSDataType::TEXT;
+ else if (str == "NULLTYPE") return TSDataType::NULLTYPE;
return TSDataType::TEXT;
}
-void RpcUtils::verifySuccess(TSStatus& status) {
+void RpcUtils::verifySuccess(TSStatus &status) {
if (status.code == TSStatusCode::MULTIPLE_ERROR) {
verifySuccess(status.subStatus);
return;
}
if (status.code != TSStatusCode::SUCCESS_STATUS) {
- char buf[111];
- sprintf(buf, "%d: %s", status.code, status.message.c_str());
- throw IoTDBConnectionException(buf);
+ throw IoTDBConnectionException(to_string(status.code) + ": " +
status.message.c_str());
}
}
-void RpcUtils::verifySuccess(vector<TSStatus>& statuses) {
+
+void RpcUtils::verifySuccess(vector <TSStatus> &statuses) {
for (TSStatus status : statuses) {
if (status.code != TSStatusCode::SUCCESS_STATUS) {
- char buf[111];
- sprintf(buf, "%s", status.message.c_str());
- throw BatchExecutionException(statuses, buf);
+ throw BatchExecutionException(statuses, status.message);
}
}
}
@@ -59,36 +56,45 @@ TSStatus RpcUtils::getStatus(TSStatusCode::TSStatusCode
tsStatusCode) {
tmpTSStatus.__set_code(tsStatusCode);
return tmpTSStatus;
}
+
TSStatus RpcUtils::getStatus(int code, string message) {
TSStatus status = TSStatus();
status.__set_code(code);
status.__set_message(message);
return status;
}
-shared_ptr<TSExecuteStatementResp>
RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode) {
+
+shared_ptr <TSExecuteStatementResp>
RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode) {
TSStatus status = getStatus(tsStatusCode);
return getTSExecuteStatementResp(status);
}
-shared_ptr<TSExecuteStatementResp>
RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode,
string message) {
+
+shared_ptr <TSExecuteStatementResp>
+RpcUtils::getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode,
string message) {
TSStatus status = getStatus(tsStatusCode, message);
return getTSExecuteStatementResp(status);
}
-shared_ptr<TSExecuteStatementResp>
RpcUtils::getTSExecuteStatementResp(TSStatus& status) {
- shared_ptr<TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+
+shared_ptr <TSExecuteStatementResp>
RpcUtils::getTSExecuteStatementResp(TSStatus &status) {
+ shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
TSStatus tsStatus(status);
resp->status = status;
return resp;
}
-shared_ptr<TSFetchResultsResp>
RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode) {
+
+shared_ptr <TSFetchResultsResp>
RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode) {
TSStatus status = getStatus(tsStatusCode);
return getTSFetchResultsResp(status);
}
-shared_ptr<TSFetchResultsResp>
RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, string
appendMessage) {
+
+shared_ptr <TSFetchResultsResp>
+RpcUtils::getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode,
string appendMessage) {
TSStatus status = getStatus(tsStatusCode, appendMessage);
return getTSFetchResultsResp(status);
}
-shared_ptr<TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(TSStatus&
status) {
- shared_ptr<TSFetchResultsResp> resp(new TSFetchResultsResp());
+
+shared_ptr <TSFetchResultsResp> RpcUtils::getTSFetchResultsResp(TSStatus
&status) {
+ shared_ptr <TSFetchResultsResp> resp(new TSFetchResultsResp());
TSStatus tsStatus(status);
resp->__set_status(tsStatus);
return resp;
@@ -116,37 +122,36 @@ int Tablet::getValueByteSize() {
int valueOccupation = 0;
for (int i = 0; i < schemas.size(); i++) {
switch (schemas[i].second) {
- case TSDataType::BOOLEAN:
- valueOccupation += rowSize;
- break;
- case TSDataType::INT32:
- valueOccupation += rowSize * 4;
- break;
- case TSDataType::INT64:
- valueOccupation += rowSize * 8;
- break;
- case TSDataType::FLOAT:
- valueOccupation += rowSize * 4;
- break;
- case TSDataType::DOUBLE:
- valueOccupation += rowSize * 8;
- break;
- case TSDataType::TEXT:
- valueOccupation += rowSize * 4;
- for (string value : values[i]) {
- valueOccupation += value.size();
- }
- break;
- default:
- char buf[111];
- sprintf(buf, "Data type %d is not supported.", schemas[i].second);
- throw UnSupportedDataTypeException(buf);
+ case TSDataType::BOOLEAN:
+ valueOccupation += rowSize;
+ break;
+ case TSDataType::INT32:
+ valueOccupation += rowSize * 4;
+ break;
+ case TSDataType::INT64:
+ valueOccupation += rowSize * 8;
+ break;
+ case TSDataType::FLOAT:
+ valueOccupation += rowSize * 4;
+ break;
+ case TSDataType::DOUBLE:
+ valueOccupation += rowSize * 8;
+ break;
+ case TSDataType::TEXT:
+ valueOccupation += rowSize * 4;
+ for (string value : values[i]) {
+ valueOccupation += value.size();
+ }
+ break;
+ default:
+ throw UnSupportedDataTypeException(
+ string("Data type ") + to_string(schemas[i].second) +
" is not supported.");
}
}
return valueOccupation;
}
-string SessionUtils::getTime(Tablet& tablet) {
+string SessionUtils::getTime(Tablet &tablet) {
MyStringBuffer timeBuffer;
for (int i = 0; i < tablet.rowSize; i++) {
timeBuffer.putLong(tablet.timestamps[i]);
@@ -154,78 +159,71 @@ string SessionUtils::getTime(Tablet& tablet) {
return timeBuffer.str;
}
-string SessionUtils::getValue(Tablet& tablet) {
+string SessionUtils::getValue(Tablet &tablet) {
MyStringBuffer valueBuffer;
for (int i = 0; i < tablet.schemas.size(); i++) {
TSDataType::TSDataType dataType = tablet.schemas[i].second;
- switch (dataType)
- {
- case TSDataType::BOOLEAN:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putBool(tablet.values[i][index] == "true" ? true :
false);
- }
- break;
- case TSDataType::INT32:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putInt(stoi(tablet.values[i][index]));
- }
- break;
- case TSDataType::INT64:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putLong(stol(tablet.values[i][index]));
- }
- break;
- case TSDataType::FLOAT:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putFloat(stof(tablet.values[i][index]));
- }
- break;
- case TSDataType::DOUBLE:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putDouble(stod(tablet.values[i][index]));
- }
- break;
- case TSDataType::TEXT:
- for (int index = 0; index < tablet.rowSize; index++) {
- valueBuffer.putString(tablet.values[i][index]);
- }
- break;
- default:
- char buf[111];
- sprintf(buf, "Data type %d is not supported.", dataType);
- throw UnSupportedDataTypeException(buf);
- break;
+ switch (dataType) {
+ case TSDataType::BOOLEAN:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putBool(tablet.values[i][index] == "true");
+ }
+ break;
+ case TSDataType::INT32:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putInt(stoi(tablet.values[i][index]));
+ }
+ break;
+ case TSDataType::INT64:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putLong(stol(tablet.values[i][index]));
+ }
+ break;
+ case TSDataType::FLOAT:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putFloat(stof(tablet.values[i][index]));
+ }
+ break;
+ case TSDataType::DOUBLE:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putDouble(stod(tablet.values[i][index]));
+ }
+ break;
+ case TSDataType::TEXT:
+ for (int index = 0; index < tablet.rowSize; index++) {
+ valueBuffer.putString(tablet.values[i][index]);
+ }
+ break;
+ default:
+ throw UnSupportedDataTypeException(string("Data type ") +
to_string(dataType) + " is not supported.");
}
}
return valueBuffer.str;
}
-int SessionDataSet::getBatchSize()
-{
+int SessionDataSet::getBatchSize() {
return batchSize;
}
-void SessionDataSet::setBatchSize(int batchSize)
-{
+void SessionDataSet::setBatchSize(int batchSize) {
this->batchSize = batchSize;
}
-vector<string> SessionDataSet::getColumnNames() { return this->columnNameList;
}
+vector <string> SessionDataSet::getColumnNames() { return
this->columnNameList; }
-bool SessionDataSet::hasNext()
-{
+bool SessionDataSet::hasNext() {
if (hasCachedRecord) {
return true;
}
if (!tsQueryDataSetTimeBuffer.hasRemaining()) {
- shared_ptr<TSFetchResultsReq> req(new TSFetchResultsReq());
+ shared_ptr <TSFetchResultsReq> req(new TSFetchResultsReq());
req->__set_sessionId(sessionId);
req->__set_statement(sql);
req->__set_fetchSize(batchSize);
req->__set_queryId(queryId);
req->__set_isAlign(true);
try {
- shared_ptr<TSFetchResultsResp> resp(new TSFetchResultsResp());
+ shared_ptr <TSFetchResultsResp> resp(new TSFetchResultsResp());
client->fetchResults(*resp, *req);
RpcUtils::verifySuccess(resp->status);
@@ -237,11 +235,9 @@ bool SessionDataSet::hasNext()
rowsIndex = 0;
}
}
- catch (IoTDBConnectionException e)
- {
- char buf[111];
- sprintf(buf, "Cannot fetch result from server, because of network
connection: %s", e.what());
- throw IoTDBConnectionException(buf);
+ catch (IoTDBConnectionException e) {
+ throw IoTDBConnectionException(
+ string("Cannot fetch result from server, because of
network connection: ") + e.what());
}
}
@@ -251,7 +247,7 @@ bool SessionDataSet::hasNext()
}
void SessionDataSet::constructOneRow() {
- vector<Field> outFields;
+ vector <Field> outFields;
int loc = 0;
for (int i = 0; i < columnSize; i++) {
Field field;
@@ -269,41 +265,40 @@ void SessionDataSet::constructOneRow() {
TSDataType::TSDataType dataType =
getTSDataTypeFromString(columnTypeDeduplicatedList[loc]);
field.dataType = dataType;
switch (dataType) {
- case TSDataType::BOOLEAN: {
- bool booleanValue = valueBuffer->getBool();
- field.boolV = booleanValue;
- break;
- }
- case TSDataType::INT32: {
- int intValue = valueBuffer->getInt();
- field.intV = intValue;
- break;
- }
- case TSDataType::INT64: {
- int64_t longValue = valueBuffer->getLong();
- field.longV = longValue;
- break;
- }
- case TSDataType::FLOAT: {
- float floatValue = valueBuffer->getFloat();
- field.floatV = floatValue;
- break;
- }
- case TSDataType::DOUBLE: {
- double doubleValue = valueBuffer->getDouble();
- field.doubleV = doubleValue;
- break;
- }
- case TSDataType::TEXT: {
- string stringValue = valueBuffer->getString();
- field.stringV = stringValue;
- break;
- }
- default: {
- char buf[111];
- sprintf(buf, "Data type %s is not supported.",
columnTypeDeduplicatedList[i].c_str());
- throw UnSupportedDataTypeException(buf);
- }
+ case TSDataType::BOOLEAN: {
+ bool booleanValue = valueBuffer->getBool();
+ field.boolV = booleanValue;
+ break;
+ }
+ case TSDataType::INT32: {
+ int intValue = valueBuffer->getInt();
+ field.intV = intValue;
+ break;
+ }
+ case TSDataType::INT64: {
+ int64_t longValue = valueBuffer->getLong();
+ field.longV = longValue;
+ break;
+ }
+ case TSDataType::FLOAT: {
+ float floatValue = valueBuffer->getFloat();
+ field.floatV = floatValue;
+ break;
+ }
+ case TSDataType::DOUBLE: {
+ double doubleValue = valueBuffer->getDouble();
+ field.doubleV = doubleValue;
+ break;
+ }
+ case TSDataType::TEXT: {
+ string stringValue = valueBuffer->getString();
+ field.stringV = stringValue;
+ break;
+ }
+ default: {
+ throw UnSupportedDataTypeException(
+ string("Data type ") +
columnTypeDeduplicatedList[i].c_str() + " is not supported.");
+ }
}
} else {
field.dataType = TSDataType::NULLTYPE;
@@ -323,8 +318,7 @@ bool SessionDataSet::isNull(int index, int rowNum) {
return ((flag >> shift) & bitmap) == 0;
}
-RowRecord* SessionDataSet::next()
-{
+RowRecord *SessionDataSet::next() {
if (!hasCachedRecord) {
if (!hasNext()) {
return NULL;
@@ -335,22 +329,18 @@ RowRecord* SessionDataSet::next()
return &rowRecord;
}
-void SessionDataSet::closeOperationHandle()
-{
- shared_ptr<TSCloseOperationReq> closeReq(new TSCloseOperationReq());
+void SessionDataSet::closeOperationHandle() {
+ shared_ptr <TSCloseOperationReq> closeReq(new TSCloseOperationReq());
closeReq->__set_sessionId(sessionId);
closeReq->__set_queryId(queryId);
- shared_ptr<TSStatus> closeResp(new TSStatus());
- try
- {
- client->closeOperation(*closeResp,*closeReq);
+ shared_ptr <TSStatus> closeResp(new TSStatus());
+ try {
+ client->closeOperation(*closeResp, *closeReq);
RpcUtils::verifySuccess(*closeResp);
}
- catch (IoTDBConnectionException e)
- {
- char buf[111];
- sprintf(buf,"Error occurs when connecting to server for close
operation, because: %s",e.what());
- throw IoTDBConnectionException(buf);
+ catch (IoTDBConnectionException e) {
+ throw IoTDBConnectionException(
+ string("Error occurs when connecting to server for close
operation, because: ") + e.what());
}
}
@@ -360,7 +350,7 @@ void SessionDataSet::closeOperationHandle()
*
* @return whether the batch has been sorted
*/
-bool Session::checkSorted(Tablet& tablet) {
+bool Session::checkSorted(Tablet &tablet) {
for (int i = 1; i < tablet.rowSize; i++) {
if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
return false;
@@ -369,7 +359,7 @@ bool Session::checkSorted(Tablet& tablet) {
return true;
}
-bool Session::checkSorted(vector<int64_t>& times) {
+bool Session::checkSorted(vector <int64_t> ×) {
for (int i = 1; i < times.size(); i++) {
if (times[i] < times[i - 1]) {
return false;
@@ -378,13 +368,13 @@ bool Session::checkSorted(vector<int64_t>& times) {
return true;
}
-void Session::sortTablet(Tablet& tablet) {
+void Session::sortTablet(Tablet &tablet) {
/*
* following part of code sort the batch data by time,
* so we can insert continuous data in value list to get a better
performance
*/
- // sort to get index, and use index to sort value list
- int* index = new int[tablet.rowSize];
+ // sort to get index, and use index to sort value list
+ int *index = new int[tablet.rowSize];
for (int i = 0; i < tablet.rowSize; i++) {
index[i] = i;
}
@@ -398,7 +388,7 @@ void Session::sortTablet(Tablet& tablet) {
delete[] index;
}
-void Session::sortIndexByTimestamp(int* index, std::vector<int64_t>&
timestamps, int length) {
+void Session::sortIndexByTimestamp(int *index, std::vector <int64_t>
×tamps, int length) {
// Use Insert Sort Algorithm
if (length >= 2) {
for (int i = 1; i < length; i++) {
@@ -421,16 +411,16 @@ void Session::sortIndexByTimestamp(int* index,
std::vector<int64_t>& timestamps,
/**
* Append value into buffer in Big Endian order to comply with IoTDB server
*/
-void Session::appendValues(string &buffer, char* value, int size) {
+void Session::appendValues(string &buffer, char *value, int size) {
for (int i = size - 1; i >= 0; i--) {
buffer.append(value + i, 1);
}
}
-void Session::putValuesIntoBuffer(vector<TSDataType::TSDataType>& types,
vector<char*>& values, string& buf) {
+void Session::putValuesIntoBuffer(vector <TSDataType::TSDataType> &types,
vector<char *> &values, string &buf) {
for (int i = 0; i < values.size(); i++) {
int8_t typeNum = getDataTypeNumber(types[i]);
- buf.append((char*)(&typeNum), sizeof(int8_t));
+ buf.append((char *) (&typeNum), sizeof(int8_t));
switch (types[i]) {
case TSDataType::BOOLEAN:
buf.append(values[i], 1);
@@ -450,7 +440,7 @@ void
Session::putValuesIntoBuffer(vector<TSDataType::TSDataType>& types, vector<
case TSDataType::TEXT:
string str(values[i]);
int len = str.length();
- appendValues(buf, (char*)(&len), sizeof(int));
+ appendValues(buf, (char *) (&len), sizeof(int));
// no need to change the byte order of string value
buf.append(values[i], len);
break;
@@ -477,89 +467,73 @@ int8_t Session::getDataTypeNumber(TSDataType::TSDataType
type) {
}
}
-void Session::open()
-{
- try
- {
+void Session::open() {
+ try {
open(false, DEFAULT_TIMEOUT_MS);
}
- catch (IoTDBConnectionException e)
- {
+ catch (IoTDBConnectionException e) {
throw IoTDBConnectionException(e.what());
}
}
void Session::open(bool enableRPCCompression) {
- try
- {
+ try {
open(enableRPCCompression, DEFAULT_TIMEOUT_MS);
}
- catch (IoTDBConnectionException e)
- {
+ catch (IoTDBConnectionException e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::open(bool enableRPCCompression, int connectionTimeoutInMs)
-{
- if (!isClosed)
- {
+void Session::open(bool enableRPCCompression, int connectionTimeoutInMs) {
+ if (!isClosed) {
return;
}
- shared_ptr<TSocket> socket(new TSocket(host, rpcPort));
- shared_ptr<TTransport> transport(new TFramedTransport(socket));
+ shared_ptr <TSocket> socket(new TSocket(host, rpcPort));
+ shared_ptr <TTransport> transport(new TFramedTransport(socket));
socket->setConnTimeout(connectionTimeoutInMs);
- if (!transport->isOpen())
- {
- try
- {
+ if (!transport->isOpen()) {
+ try {
transport->open();
- }
- catch (TTransportException e)
- {
+ }
+ catch (TTransportException e) {
throw IoTDBConnectionException(e.what());
}
}
- if (enableRPCCompression)
- {
- shared_ptr<TCompactProtocol> protocol(new TCompactProtocol(transport));
- shared_ptr<TSIServiceIf> client_instance(new
TSIServiceClient(protocol));
+ if (enableRPCCompression) {
+ shared_ptr <TCompactProtocol> protocol(new
TCompactProtocol(transport));
+ shared_ptr <TSIServiceIf> client_instance(new
TSIServiceClient(protocol));
client = client_instance;
} else {
- shared_ptr<TBinaryProtocol> protocol(new TBinaryProtocol(transport));
- shared_ptr<TSIServiceIf> client_instance(new
TSIServiceClient(protocol));
+ shared_ptr <TBinaryProtocol> protocol(new TBinaryProtocol(transport));
+ shared_ptr <TSIServiceIf> client_instance(new
TSIServiceClient(protocol));
client = client_instance;
}
- shared_ptr<TSOpenSessionReq> openReq(new TSOpenSessionReq());
+ shared_ptr <TSOpenSessionReq> openReq(new TSOpenSessionReq());
openReq->__set_username(username);
openReq->__set_password(password);
openReq->__set_zoneId(zoneId);
- try
- {
- shared_ptr<TSOpenSessionResp> openResp(new TSOpenSessionResp());
- client->openSession(*openResp,*openReq);
+ try {
+ shared_ptr <TSOpenSessionResp> openResp(new TSOpenSessionResp());
+ client->openSession(*openResp, *openReq);
RpcUtils::verifySuccess(openResp->status);
- if (protocolVersion != openResp->serverProtocolVersion)
- {
+ if (protocolVersion != openResp->serverProtocolVersion) {
if (openResp->serverProtocolVersion == 0) {// less than 0.10
- char buf[111];
- sprintf(buf, "Protocol not supported, Client version is %d,
but Server version is %d", protocolVersion, openResp->serverProtocolVersion);
- logic_error e(buf);
- throw exception(e);
+ throw logic_error(string("Protocol not supported, Client
version is ") + to_string(protocolVersion) +
+ ", but Server version is " +
to_string(openResp->serverProtocolVersion));
}
}
sessionId = openResp->sessionId;
statementId = client->requestStatementId(sessionId);
-
+
if (zoneId != "") {
setTimeZone(zoneId);
} else {
zoneId = getTimeZone();
}
}
- catch (exception e)
- {
+ catch (exception e) {
transport->close();
throw IoTDBConnectionException(e.what());
}
@@ -567,59 +541,47 @@ void Session::open(bool enableRPCCompression, int
connectionTimeoutInMs)
}
-
-void Session::close()
-{
- if (isClosed)
- {
+void Session::close() {
+ if (isClosed) {
return;
}
- shared_ptr<TSCloseSessionReq> req(new TSCloseSessionReq());
+ shared_ptr <TSCloseSessionReq> req(new TSCloseSessionReq());
req->__set_sessionId(sessionId);
- try
- {
- shared_ptr<TSStatus> resp(new TSStatus());
- client->closeSession(*resp,*req);
- }
- catch (exception e)
- {
- char buf[111];
- sprintf(buf,"Error occurs when closing session at server. Maybe server
is down. %s",e.what());
- throw IoTDBConnectionException(buf);
- }
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ client->closeSession(*resp, *req);
+ }
+ catch (exception e) {
+ throw IoTDBConnectionException(
+ string("Error occurs when closing session at server. Maybe
server is down. ") + e.what());
+ }
isClosed = true;
- if (transport != NULL)
- {
+ if (transport != NULL) {
transport->close();
}
}
-
-void Session::insertRecord(string deviceId, int64_t time, vector<string>&
measurements, vector<string>& values)
-{
- shared_ptr<TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
+void Session::insertRecord(string deviceId, int64_t time, vector <string>
&measurements, vector <string> &values) {
+ shared_ptr <TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
req->__set_sessionId(sessionId);
req->__set_deviceId(deviceId);
req->__set_timestamp(time);
req->__set_measurements(measurements);
req->__set_values(values);
- shared_ptr<TSStatus> resp(new TSStatus());
- try
- {
- client->insertStringRecord(*resp,*req);
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->insertStringRecord(*resp, *req);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::insertRecord(string prefixPath, int64_t time, vector<string>&
measurements,
- vector<TSDataType::TSDataType>& types, vector<char*>& values)
-{
- shared_ptr<TSInsertRecordReq> req(new TSInsertRecordReq());
+void Session::insertRecord(string prefixPath, int64_t time, vector <string>
&measurements,
+ vector <TSDataType::TSDataType> &types, vector<char
*> &values) {
+ shared_ptr <TSInsertRecordReq> req(new TSInsertRecordReq());
req->__set_sessionId(sessionId);
req->__set_prefixPath(prefixPath);
req->__set_timestamp(time);
@@ -627,54 +589,55 @@ void Session::insertRecord(string prefixPath, int64_t
time, vector<string>& mea
string buffer;
putValuesIntoBuffer(types, values, buffer);
req->__set_values(buffer);
- shared_ptr<TSStatus> resp(new TSStatus());
+ shared_ptr <TSStatus> resp(new TSStatus());
try {
- client->insertRecord(*resp,*req);
+ client->insertRecord(*resp, *req);
RpcUtils::verifySuccess(*resp);
- } catch (IoTDBConnectionException& e) {
+ } catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::insertRecords(vector<string>& deviceIds, vector<int64_t>& times,
vector<vector<string>>& measurementsList, vector<vector<string>>& valuesList) {
+void
+Session::insertRecords(vector <string> &deviceIds, vector <int64_t> ×,
vector <vector<string>> &measurementsList,
+ vector <vector<string>> &valuesList) {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len !=
valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's
size should be equal");
throw exception(e);
}
- shared_ptr<TSInsertStringRecordsReq> request(new
TSInsertStringRecordsReq());
+ shared_ptr <TSInsertStringRecordsReq> request(new
TSInsertStringRecordsReq());
request->__set_sessionId(sessionId);
request->__set_deviceIds(deviceIds);
request->__set_timestamps(times);
request->__set_measurementsList(measurementsList);
request->__set_valuesList(valuesList);
- try
- {
- shared_ptr<TSStatus> resp(new TSStatus());
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
client->insertStringRecords(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::insertRecords(vector<string>& deviceIds, vector<int64_t>& times,
- vector<vector<string>>& measurementsList,
vector<vector<TSDataType::TSDataType>> typesList,
- vector<vector<char*>>& valuesList) {
+void Session::insertRecords(vector <string> &deviceIds, vector <int64_t>
×,
+ vector <vector<string>> &measurementsList,
+ vector <vector<TSDataType::TSDataType>> typesList,
+ vector <vector<char *>> &valuesList) {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len !=
valuesList.size()) {
logic_error e("deviceIds, times, measurementsList and valuesList's
size should be equal");
throw exception(e);
}
- shared_ptr<TSInsertRecordsReq> request(new TSInsertRecordsReq());
+ shared_ptr <TSInsertRecordsReq> request(new TSInsertRecordsReq());
request->__set_sessionId(sessionId);
request->__set_deviceIds(deviceIds);
request->__set_timestamps(times);
request->__set_measurementsList(measurementsList);
- vector<string> bufferList;
+ vector <string> bufferList;
for (int i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
@@ -683,30 +646,32 @@ void Session::insertRecords(vector<string>& deviceIds,
vector<int64_t>& times,
request->__set_valuesList(bufferList);
try {
- shared_ptr<TSStatus> resp(new TSStatus());
+ shared_ptr <TSStatus> resp(new TSStatus());
client->insertRecords(*resp, *request);
RpcUtils::verifySuccess(*resp);
- } catch (IoTDBConnectionException& e) {
+ } catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::insertRecordsOfOneDevice(string deviceId, vector<int64_t>& times,
- vector<vector<string>> measurementsList,
vector<vector<TSDataType::TSDataType>> typesList,
- vector<vector<char*>>& valuesList) {
+void Session::insertRecordsOfOneDevice(string deviceId, vector <int64_t>
×,
+ vector <vector<string>>
measurementsList,
+ vector <vector<TSDataType::TSDataType>>
typesList,
+ vector <vector<char *>> &valuesList) {
insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList,
valuesList, false);
}
-void Session::insertRecordsOfOneDevice(string deviceId, vector<int64_t>& times,
- vector<vector<string>> measurementsList,
vector<vector<TSDataType::TSDataType>> typesList,
- vector<vector<char*>>& valuesList, bool sorted) {
+void Session::insertRecordsOfOneDevice(string deviceId, vector <int64_t>
×,
+ vector <vector<string>>
measurementsList,
+ vector <vector<TSDataType::TSDataType>>
typesList,
+ vector <vector<char *>> &valuesList,
bool sorted) {
if (sorted) {
if (!checkSorted(times)) {
throw BatchExecutionException("Times in InsertOneDeviceRecords are
not in ascending order");
}
} else {
- int* index = new int[times.size()];
+ int *index = new int[times.size()];
for (int i = 0; i < times.size(); i++) {
index[i] = i;
}
@@ -718,12 +683,12 @@ void Session::insertRecordsOfOneDevice(string deviceId,
vector<int64_t>& times,
valuesList = sortList(valuesList, index, times.size());
delete[] index;
}
- unique_ptr<TSInsertRecordsOfOneDeviceReq> request(new
TSInsertRecordsOfOneDeviceReq());
+ unique_ptr <TSInsertRecordsOfOneDeviceReq> request(new
TSInsertRecordsOfOneDeviceReq());
request->__set_sessionId(sessionId);
request->__set_deviceId(deviceId);
request->__set_timestamps(times);
request->__set_measurementsList(measurementsList);
- vector<string> bufferList;
+ vector <string> bufferList;
for (int i = 0; i < valuesList.size(); i++) {
string buffer;
putValuesIntoBuffer(typesList[i], valuesList[i], buffer);
@@ -732,27 +697,25 @@ void Session::insertRecordsOfOneDevice(string deviceId,
vector<int64_t>& times,
request->__set_valuesList(bufferList);
try {
- unique_ptr<TSStatus> resp(new TSStatus());
+ unique_ptr <TSStatus> resp(new TSStatus());
client->insertRecordsOfOneDevice(*resp, *request);
RpcUtils::verifySuccess(*resp);
- } catch (const exception& e) {
+ } catch (const exception &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::insertTablet(Tablet& tablet) {
- try
- {
+void Session::insertTablet(Tablet &tablet) {
+ try {
insertTablet(tablet, false);
}
- catch (const exception& e)
- {
+ catch (const exception &e) {
logic_error error(e.what());
throw exception(error);
}
}
-void Session::insertTablet(Tablet& tablet, bool sorted) {
+void Session::insertTablet(Tablet &tablet, bool sorted) {
if (sorted) {
if (!checkSorted(tablet)) {
throw BatchExecutionException("Times in Tablet are not in
ascending order");
@@ -761,10 +724,10 @@ void Session::insertTablet(Tablet& tablet, bool sorted) {
sortTablet(tablet);
}
- shared_ptr<TSInsertTabletReq> request(new TSInsertTabletReq());
+ shared_ptr <TSInsertTabletReq> request(new TSInsertTabletReq());
request->__set_sessionId(sessionId);
request->prefixPath = tablet.deviceId;
- for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
+ for (pair <string, TSDataType::TSDataType> schema : tablet.schemas) {
request->measurements.push_back(schema.first);
request->types.push_back(schema.second);
}
@@ -772,32 +735,28 @@ void Session::insertTablet(Tablet& tablet, bool sorted) {
request->__set_values(SessionUtils::getValue(tablet));
request->__set_size(tablet.rowSize);
- try
- {
- shared_ptr<TSStatus> resp(new TSStatus());
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
client->insertTablet(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw new IoTDBConnectionException(e.what());
}
}
-void Session::insertTablets(map<string, Tablet*>& tablets) {
- try
- {
+void Session::insertTablets(map<string, Tablet *> &tablets) {
+ try {
insertTablets(tablets, false);
}
- catch (const exception& e)
- {
+ catch (const exception &e) {
logic_error error(e.what());
throw exception(error);
}
}
-void Session::insertTablets(map<string, Tablet*>& tablets, bool sorted) {
- shared_ptr<TSInsertTabletsReq> request(new TSInsertTabletsReq());
+void Session::insertTablets(map<string, Tablet *> &tablets, bool sorted) {
+ shared_ptr <TSInsertTabletsReq> request(new TSInsertTabletsReq());
request->__set_sessionId(sessionId);
for (auto &item : tablets) {
@@ -810,9 +769,9 @@ void Session::insertTablets(map<string, Tablet*>& tablets,
bool sorted) {
}
request->deviceIds.push_back(item.second->deviceId);
- vector<string> measurements;
+ vector <string> measurements;
vector<int> dataTypes;
- for (pair<string, TSDataType::TSDataType> schema :
item.second->schemas) {
+ for (pair <string, TSDataType::TSDataType> schema :
item.second->schemas) {
measurements.push_back(schema.first);
dataTypes.push_back(schema.second);
}
@@ -822,43 +781,39 @@ void Session::insertTablets(map<string, Tablet*>&
tablets, bool sorted) {
request->valuesList.push_back(SessionUtils::getValue(*(item.second)));
request->sizeList.push_back(item.second->rowSize);
- try
- {
- shared_ptr<TSStatus> resp(new TSStatus());
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
client->insertTablets(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
- catch (const exception& e)
- {
+ catch (const exception &e) {
throw IoTDBConnectionException(e.what());
}
}
}
-void Session::testInsertRecord(string deviceId, int64_t time, vector<string>&
measurements, vector<string>& values) {
- shared_ptr<TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
+void Session::testInsertRecord(string deviceId, int64_t time, vector <string>
&measurements, vector <string> &values) {
+ shared_ptr <TSInsertStringRecordReq> req(new TSInsertStringRecordReq());
req->__set_sessionId(sessionId);
req->__set_deviceId(deviceId);
req->__set_timestamp(time);
req->__set_measurements(measurements);
req->__set_values(values);
- shared_ptr<TSStatus> resp(new TSStatus());
- try
- {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
client->insertStringRecord(*resp, *req);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException e)
- {
+ catch (IoTDBConnectionException e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::testInsertTablet(Tablet& tablet) {
- shared_ptr<TSInsertTabletReq> request(new TSInsertTabletReq());
+void Session::testInsertTablet(Tablet &tablet) {
+ shared_ptr <TSInsertTabletReq> request(new TSInsertTabletReq());
request->__set_sessionId(sessionId);
request->prefixPath = tablet.deviceId;
- for (pair<string, TSDataType::TSDataType> schema : tablet.schemas) {
+ for (pair <string, TSDataType::TSDataType> schema : tablet.schemas) {
request->measurements.push_back(schema.first);
request->types.push_back(schema.second);
}
@@ -866,139 +821,121 @@ void Session::testInsertTablet(Tablet& tablet) {
request->__set_values(SessionUtils::getValue(tablet));
request->__set_size(tablet.rowSize);
- try
- {
- shared_ptr<TSStatus> resp(new TSStatus());
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
client->testInsertTablet(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw new IoTDBConnectionException(e.what());
}
}
-void Session::testInsertRecords(vector<string>& deviceIds, vector<int64_t>&
times, vector<vector<string>>& measurementsList, vector<vector<string>>&
valuesList) {
+void Session::testInsertRecords(vector <string> &deviceIds, vector <int64_t>
×,
+ vector <vector<string>> &measurementsList,
vector <vector<string>> &valuesList) {
int len = deviceIds.size();
if (len != times.size() || len != measurementsList.size() || len !=
valuesList.size()) {
logic_error error("deviceIds, times, measurementsList and valuesList's
size should be equal");
throw exception(error);
}
- shared_ptr<TSInsertStringRecordsReq> request(new
TSInsertStringRecordsReq());
+ shared_ptr <TSInsertStringRecordsReq> request(new
TSInsertStringRecordsReq());
request->__set_sessionId(sessionId);
request->__set_deviceIds(deviceIds);
request->__set_timestamps(times);
request->__set_measurementsList(measurementsList);
request->__set_valuesList(valuesList);
- try
- {
- shared_ptr<TSStatus> resp(new TSStatus());
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
client->insertStringRecords(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::deleteTimeseries(string path)
-{
- vector<string> paths;
+void Session::deleteTimeseries(string path) {
+ vector <string> paths;
paths.push_back(path);
deleteTimeseries(paths);
}
-void Session::deleteTimeseries(vector<string>& paths)
-{
- shared_ptr<TSStatus> resp(new TSStatus());
- try
- {
+void Session::deleteTimeseries(vector <string> &paths) {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
client->deleteTimeseries(*resp, sessionId, paths);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::deleteData(string path, int64_t time)
-{
- vector<string> paths;
+void Session::deleteData(string path, int64_t time) {
+ vector <string> paths;
paths.push_back(path);
deleteData(paths, time);
}
-void Session::deleteData(vector<string>& deviceId, int64_t time)
-{
- shared_ptr<TSDeleteDataReq> req(new TSDeleteDataReq());
+void Session::deleteData(vector <string> &deviceId, int64_t time) {
+ shared_ptr <TSDeleteDataReq> req(new TSDeleteDataReq());
req->__set_sessionId(sessionId);
req->__set_paths(deviceId);
req->__set_endTime(time);
- shared_ptr<TSStatus> resp(new TSStatus());
- try
- {
- client->deleteData(*resp,*req);
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->deleteData(*resp, *req);
RpcUtils::verifySuccess(*resp);
}
- catch (exception& e)
- {
+ catch (exception &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::setStorageGroup(string storageGroupId)
-{
- shared_ptr<TSStatus> resp(new TSStatus());
- try
- {
- client->setStorageGroup(*resp,sessionId, storageGroupId);
+void Session::setStorageGroup(string storageGroupId) {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->setStorageGroup(*resp, sessionId, storageGroupId);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::deleteStorageGroup(string storageGroup)
-{
- vector<string> storageGroups;
+void Session::deleteStorageGroup(string storageGroup) {
+ vector <string> storageGroups;
storageGroups.push_back(storageGroup);
deleteStorageGroups(storageGroups);
}
-void Session::deleteStorageGroups(vector<string>& storageGroups)
-{
- shared_ptr<TSStatus> resp(new TSStatus());
- try
- {
+void Session::deleteStorageGroups(vector <string> &storageGroups) {
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
client->deleteStorageGroups(*resp, sessionId, storageGroups);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::createTimeseries(string path, TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding, CompressionType::CompressionType compressor) {
- try
- {
+void Session::createTimeseries(string path, TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
+ CompressionType::CompressionType compressor) {
+ try {
createTimeseries(path, dataType, encoding, compressor, NULL, NULL,
NULL, "");
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::createTimeseries(string path, TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding, CompressionType::CompressionType compressor,
- map<string, string>* props, map<string, string>* tags, map<string,
string>* attributes, string measurementAlias)
-{
- shared_ptr<TSCreateTimeseriesReq> req(new TSCreateTimeseriesReq());
+void Session::createTimeseries(string path, TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
+ CompressionType::CompressionType compressor,
+ map <string, string> *props, map <string,
string> *tags,
+ map <string, string> *attributes, string
measurementAlias) {
+ shared_ptr <TSCreateTimeseriesReq> req(new TSCreateTimeseriesReq());
req->__set_sessionId(sessionId);
req->__set_path(path);
req->__set_dataType(dataType);
@@ -1018,21 +955,23 @@ void Session::createTimeseries(string path,
TSDataType::TSDataType dataType, TSE
req->__set_measurementAlias(measurementAlias);
}
- shared_ptr<TSStatus> resp(new TSStatus());
- try
- {
- client->createTimeseries(*resp,*req);
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->createTimeseries(*resp, *req);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
-void Session::createMultiTimeseries(vector<string> paths,
vector<TSDataType::TSDataType> dataTypes, vector<TSEncoding::TSEncoding>
encodings, vector<CompressionType::CompressionType> compressors,
- vector<map<string, string>>* propsList, vector<map<string, string>>*
tagsList, vector<map<string, string>>* attributesList, vector<string>*
measurementAliasList) {
- shared_ptr<TSCreateMultiTimeseriesReq> request(new
TSCreateMultiTimeseriesReq());
+void Session::createMultiTimeseries(vector <string> paths, vector
<TSDataType::TSDataType> dataTypes,
+ vector <TSEncoding::TSEncoding> encodings,
+ vector <CompressionType::CompressionType>
compressors,
+ vector <map<string, string>> *propsList,
vector <map<string, string>> *tagsList,
+ vector <map<string, string>>
*attributesList,
+ vector <string> *measurementAliasList) {
+ shared_ptr <TSCreateMultiTimeseriesReq> request(new
TSCreateMultiTimeseriesReq());
request->__set_sessionId(sessionId);
request->__set_paths(paths);
@@ -1068,14 +1007,12 @@ void Session::createMultiTimeseries(vector<string>
paths, vector<TSDataType::TSD
request->__set_measurementAliasList(*measurementAliasList);
}
- try
- {
- shared_ptr<TSStatus> resp(new TSStatus());
+ try {
+ shared_ptr <TSStatus> resp(new TSStatus());
client->createMultiTimeseries(*resp, *request);
RpcUtils::verifySuccess(*resp);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
}
@@ -1090,79 +1027,66 @@ bool Session::checkTimeseriesExists(string path) {
}
}
-string Session::getTimeZone()
-{
- if (zoneId != "")
- {
+string Session::getTimeZone() {
+ if (zoneId != "") {
return zoneId;
}
- shared_ptr<TSGetTimeZoneResp> resp(new TSGetTimeZoneResp());
- try
- {
+ shared_ptr <TSGetTimeZoneResp> resp(new TSGetTimeZoneResp());
+ try {
client->getTimeZone(*resp, sessionId);
RpcUtils::verifySuccess(resp->status);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
return resp->timeZone;
}
-void Session::setTimeZone(string zoneId)
-{
- shared_ptr<TSSetTimeZoneReq> req(new TSSetTimeZoneReq());
+void Session::setTimeZone(string zoneId) {
+ shared_ptr <TSSetTimeZoneReq> req(new TSSetTimeZoneReq());
req->__set_sessionId(sessionId);
req->__set_timeZone(zoneId);
- shared_ptr<TSStatus> resp(new TSStatus());
- try
- {
- client->setTimeZone(*resp,*req);
+ shared_ptr <TSStatus> resp(new TSStatus());
+ try {
+ client->setTimeZone(*resp, *req);
}
- catch (IoTDBConnectionException& e)
- {
+ catch (IoTDBConnectionException &e) {
throw IoTDBConnectionException(e.what());
}
RpcUtils::verifySuccess(*resp);
this->zoneId = zoneId;
}
-unique_ptr<SessionDataSet> Session::executeQueryStatement(string sql)
-{
- shared_ptr<TSExecuteStatementReq> req(new TSExecuteStatementReq());
+unique_ptr <SessionDataSet> Session::executeQueryStatement(string sql) {
+ shared_ptr <TSExecuteStatementReq> req(new TSExecuteStatementReq());
req->__set_sessionId(sessionId);
req->__set_statementId(statementId);
req->__set_statement(sql);
req->__set_fetchSize(fetchSize);
- shared_ptr<TSExecuteStatementResp> resp(new TSExecuteStatementResp());
- try
- {
- client->executeStatement(*resp,*req);
+ shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+ try {
+ client->executeStatement(*resp, *req);
RpcUtils::verifySuccess(resp->status);
}
- catch (IoTDBConnectionException e)
- {
+ catch (IoTDBConnectionException e) {
throw IoTDBConnectionException(e.what());
}
- shared_ptr<TSQueryDataSet> queryDataSet(new
TSQueryDataSet(resp->queryDataSet));
+ shared_ptr <TSQueryDataSet> queryDataSet(new
TSQueryDataSet(resp->queryDataSet));
return unique_ptr<SessionDataSet>(new SessionDataSet(
- sql, resp->columns, resp->dataTypeList, resp->queryId, client,
sessionId, queryDataSet));
+ sql, resp->columns, resp->dataTypeList, resp->queryId, client,
sessionId, queryDataSet));
}
-void Session::executeNonQueryStatement(string sql)
-{
- shared_ptr<TSExecuteStatementReq> req(new TSExecuteStatementReq());
+void Session::executeNonQueryStatement(string sql) {
+ shared_ptr <TSExecuteStatementReq> req(new TSExecuteStatementReq());
req->__set_sessionId(sessionId);
req->__set_statementId(statementId);
req->__set_statement(sql);
- shared_ptr<TSExecuteStatementResp> resp(new TSExecuteStatementResp());
- try
- {
- client->executeUpdateStatement(*resp,*req);
+ shared_ptr <TSExecuteStatementResp> resp(new TSExecuteStatementResp());
+ try {
+ client->executeUpdateStatement(*resp, *req);
RpcUtils::verifySuccess(resp->status);
}
- catch (IoTDBConnectionException e)
- {
+ catch (IoTDBConnectionException e) {
throw IoTDBConnectionException(e.what());
}
}
diff --git a/client-cpp/src/main/Session.h b/client-cpp/src/main/Session.h
index 3cbd320..620279f 100644
--- a/client-cpp/src/main/Session.h
+++ b/client-cpp/src/main/Session.h
@@ -46,59 +46,63 @@ using ::apache::thrift::transport::TBufferedTransport;
using ::apache::thrift::transport::TFramedTransport;
using ::apache::thrift::TException;
-class IoTDBConnectionException : public std::exception
-{
- public:
- IoTDBConnectionException() : message() {}
- IoTDBConnectionException(const char* m) : message(m) {}
- IoTDBConnectionException(std::string m) : message(m) {}
- virtual const char* what() const throw ()
- {
- return message.c_str();
- }
+class IoTDBConnectionException : public std::exception {
+public:
+ IoTDBConnectionException() : message() {}
+
+ IoTDBConnectionException(const char *m) : message(m) {}
+
+ IoTDBConnectionException(std::string m) : message(m) {}
+
+ virtual const char *what() const throw() {
+ return message.c_str();
+ }
- private:
- std::string message;
+private:
+ std::string message;
};
-class BatchExecutionException : public std::exception
-{
+class BatchExecutionException : public std::exception {
public:
BatchExecutionException() : message() {}
- BatchExecutionException(const char* m) : message(m) {}
+
+ BatchExecutionException(const char *m) : message(m) {}
+
BatchExecutionException(std::string m) : message(m) {}
- BatchExecutionException(std::vector<TSStatus> statusList) : message(),
statusList(statusList) {}
- BatchExecutionException(std::vector<TSStatus> statusList, std::string m) :
message(m), statusList(statusList) {}
- virtual const char* what() const throw ()
- {
+
+ BatchExecutionException(std::vector <TSStatus> statusList) : message(),
statusList(statusList) {}
+
+ BatchExecutionException(std::vector <TSStatus> statusList, std::string m)
: message(m), statusList(statusList) {}
+
+ virtual const char *what() const throw() {
return message.c_str();
}
- std::vector<TSStatus> statusList;
+
+ std::vector <TSStatus> statusList;
private:
std::string message;
};
-class UnSupportedDataTypeException : public std::exception
-{
+class UnSupportedDataTypeException : public std::exception {
private:
std::string message;
public:
UnSupportedDataTypeException() : message() {}
- UnSupportedDataTypeException(const char* m) : message(m) {}
+
+ UnSupportedDataTypeException(const char *m) : message(m) {}
+
UnSupportedDataTypeException(std::string m) : message("UnSupported
dataType: " + m) {}
};
-namespace CompressionType{
+namespace CompressionType {
- enum CompressionType
- {
+ enum CompressionType {
UNCOMPRESSED, SNAPPY, GZIP, LZO, SDT, PAA, PLA, LZ4
};
}
-namespace TSDataType{
- enum TSDataType
- {
+namespace TSDataType {
+ enum TSDataType {
BOOLEAN, INT32, INT64, FLOAT, DOUBLE, TEXT, NULLTYPE
};
}
@@ -170,48 +174,59 @@ namespace TSStatusCode {
TIME_OUT = 701,
NO_LEADER = 702,
UNSUPPORTED_OPERATION = 703,
- NODE_READ_ONLY= 704,
+ NODE_READ_ONLY = 704,
INCOMPATIBLE_VERSION = 203,
};
}
-class RpcUtils
-{
+class RpcUtils {
public:
- std::shared_ptr<TSStatus> SUCCESS_STATUS;
+ std::shared_ptr <TSStatus> SUCCESS_STATUS;
+
RpcUtils() {
SUCCESS_STATUS = std::make_shared<TSStatus>();
SUCCESS_STATUS->__set_code(TSStatusCode::SUCCESS_STATUS);
}
- static void verifySuccess(TSStatus& status);
- static void verifySuccess(std::vector<TSStatus>& statuses);
+
+ static void verifySuccess(TSStatus &status);
+
+ static void verifySuccess(std::vector <TSStatus> &statuses);
+
static TSStatus getStatus(TSStatusCode::TSStatusCode tsStatusCode);
+
static TSStatus getStatus(int code, std::string message);
- static std::shared_ptr<TSExecuteStatementResp>
getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode);
- static std::shared_ptr<TSExecuteStatementResp>
getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode, std::string
message);
- static std::shared_ptr<TSExecuteStatementResp>
getTSExecuteStatementResp(TSStatus& status);
- static std::shared_ptr<TSFetchResultsResp>
getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode);
- static std::shared_ptr<TSFetchResultsResp>
getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, std::string
appendMessage);
- static std::shared_ptr<TSFetchResultsResp> getTSFetchResultsResp(TSStatus&
status);
+
+ static std::shared_ptr <TSExecuteStatementResp>
getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode);
+
+ static std::shared_ptr <TSExecuteStatementResp>
+ getTSExecuteStatementResp(TSStatusCode::TSStatusCode tsStatusCode,
std::string message);
+
+ static std::shared_ptr <TSExecuteStatementResp>
getTSExecuteStatementResp(TSStatus &status);
+
+ static std::shared_ptr <TSFetchResultsResp>
getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode);
+
+ static std::shared_ptr <TSFetchResultsResp>
+ getTSFetchResultsResp(TSStatusCode::TSStatusCode tsStatusCode, std::string
appendMessage);
+
+ static std::shared_ptr <TSFetchResultsResp> getTSFetchResultsResp(TSStatus
&status);
};
// Simulate the ByteBuffer class in Java
class MyStringBuffer {
private:
- char* getchar(int len)
- {
- char* ret = new char[len];
+ char *getchar(int len) {
+ char *ret = new char[len];
for (int i = pos; i < pos + len; i++)
ret[pos + len - 1 - i] = str[i];
pos += len;
return ret;
}
- void putchar(int len, char* ins)
- {
+ void putchar(int len, char *ins) {
for (int i = len - 1; i > -1; i--)
str += ins[i];
}
+
public:
std::string str;
int pos;
@@ -227,112 +242,91 @@ public:
this->pos = 0;
}
- //byte get() {
- // char tmpChar = getChar();
- // return (byte)tmpChar;
- //}
-
- int getInt()
- {
- char* data = getchar(4);
- int ret = *(int*)data;
+ int getInt() {
+ char *data = getchar(4);
+ int ret = *(int *) data;
delete[]data;
return ret;
}
- int64_t getLong()
- {
- char* data = getchar(8);
- int64_t ret = *(int64_t*)data;
+ int64_t getLong() {
+ char *data = getchar(8);
+ int64_t ret = *(int64_t *) data;
delete[]data;
return ret;
}
- float getFloat()
- {
- char* data = getchar(4);
- float ret = *(float*)data;
+ float getFloat() {
+ char *data = getchar(4);
+ float ret = *(float *) data;
delete[]data;
return ret;
}
- double getDouble()
- {
- char* data = getchar(8);
- double ret = *(double*)data;
+ double getDouble() {
+ char *data = getchar(8);
+ double ret = *(double *) data;
delete[]data;
return ret;
}
- char getChar()
- {
- char* data = getchar(1);
- char ret = *(char*)data;
+ char getChar() {
+ char *data = getchar(1);
+ char ret = *(char *) data;
delete[]data;
return ret;
}
- bool getBool()
- {
- char bo = getChar();
- return bo == 1;
+ bool getBool() {
+ return getChar() == 1;
}
- std::string getString()
- {
+ std::string getString() {
int len = getInt();
std::string ret;
for (int i = 0; i < len; i++) ret.append(1, getChar());
return ret;
}
- void putInt(int ins)
- {
- char* data = (char*)&ins;
+ void putInt(int ins) {
+ char *data = (char *) &ins;
putchar(4, data);
}
- void putLong(int64_t ins)
- {
- char* data = (char*)&ins;
+ void putLong(int64_t ins) {
+ char *data = (char *) &ins;
putchar(8, data);
}
- void putFloat(float ins)
- {
- char* data = (char*)&ins;
+ void putFloat(float ins) {
+ char *data = (char *) &ins;
putchar(4, data);
}
- void putDouble(double ins)
- {
- char* data = (char*)&ins;
+ void putDouble(double ins) {
+ char *data = (char *) &ins;
putchar(8, data);
}
- void putChar(char ins)
- {
- char* data = (char*)&ins;
+ void putChar(char ins) {
+ char *data = (char *) &ins;
putchar(1, data);
}
- void putBool(bool ins)
- {
+ void putBool(bool ins) {
char tmp = 0;
if (ins) tmp = 1;
putChar(tmp);
}
- void putString(std::string ins)
- {
+ void putString(std::string ins) {
int len = ins.size();
putInt(len);
for (int i = 0; i < len; i++) putChar(ins[i]);
}
};
-class Field
-{
+class Field {
public:
TSDataType::TSDataType dataType;
bool boolV;
@@ -341,11 +335,12 @@ public:
float floatV;
double doubleV;
std::string stringV;
- Field(TSDataType::TSDataType a)
- {
+
+ Field(TSDataType::TSDataType a) {
dataType = a;
}
- Field(){}
+
+ Field() {}
};
/*
@@ -367,13 +362,14 @@ private:
static const int DEFAULT_SIZE = 1024;
public:
std::string deviceId; // deviceId of this tablet
- std::vector<std::pair<std::string, TSDataType::TSDataType>> schemas; //
the list of measurement schemas for creating the tablet
+ std::vector <std::pair<std::string, TSDataType::TSDataType>> schemas; //
the list of measurement schemas for creating the tablet
std::vector <int64_t> timestamps; //timestamps in this tablet
- std::vector<std::vector<std::string>> values;
+ std::vector <std::vector<std::string>> values;
int rowSize; //the number of rows to include in this tablet
int maxRowNumber; // the maximum number of rows for this tablet
- Tablet(){}
+ Tablet() {}
+
/**
* Return a tablet with default specified row number. This is the standard
* constructor (all Tablet should be the same size).
@@ -381,7 +377,7 @@ public:
* @param deviceId the name of the device specified to be written in
* @param timeseries the list of measurement schemas for creating the tablet
*/
- Tablet(std::string deviceId, std::vector<std::pair<std::string,
TSDataType::TSDataType>>& timeseries) {
+ Tablet(std::string deviceId, std::vector <std::pair<std::string,
TSDataType::TSDataType>> ×eries) {
Tablet(deviceId, timeseries, DEFAULT_SIZE);
}
@@ -395,7 +391,8 @@ public:
* batch
* @param maxRowNumber the maximum number of rows for this tablet
*/
- Tablet(std::string deviceId, std::vector<std::pair<std::string,
TSDataType::TSDataType>>& schemas, int maxRowNumber) {
+ Tablet(std::string deviceId, std::vector <std::pair<std::string,
TSDataType::TSDataType>> &schemas,
+ int maxRowNumber) {
this->deviceId = deviceId;
this->schemas = schemas;
this->maxRowNumber = maxRowNumber;
@@ -413,31 +410,34 @@ public:
void reset(); // Reset Tablet to the default state - set the rowSize to 0
void createColumns();
+
int getTimeBytesSize();
+
int getValueByteSize(); // total byte size that values occupies
};
class SessionUtils {
public:
- static std::string getTime(Tablet& tablet);
- static std::string getValue(Tablet& tablet);
+ static std::string getTime(Tablet &tablet);
+
+ static std::string getValue(Tablet &tablet);
};
-class RowRecord
-{
+class RowRecord {
public:
int64_t timestamp;
- std::vector<Field> fields;
- RowRecord(int64_t timestamp)
- {
+ std::vector <Field> fields;
+
+ RowRecord(int64_t timestamp) {
this->timestamp = timestamp;
}
- RowRecord(int64_t timestamp, std::vector<Field> &fields) {
+
+ RowRecord(int64_t timestamp, std::vector <Field> &fields) {
this->timestamp = timestamp;
this->fields = fields;
}
- RowRecord()
- {
+
+ RowRecord() {
this->timestamp = -1;
}
@@ -445,51 +445,38 @@ public:
this->fields.push_back(f);
}
- std::string toString()
- {
- char buf[111];
- sprintf(buf,"%lld",timestamp);
- std::string ret = buf;
- for (int i = 0; i < fields.size(); i++)
- {
+ std::string toString() {
+ std::string ret = std::to_string(timestamp);
+ for (int i = 0; i < fields.size(); i++) {
ret.append("\t");
TSDataType::TSDataType dataType = fields[i].dataType;
- switch (dataType)
- {
- case TSDataType::BOOLEAN:{
- if (fields[i].boolV) ret.append("true");
- else ret.append("false");
+ switch (dataType) {
+ case TSDataType::BOOLEAN: {
+ std::string field = fields[i].boolV ? "true" : "false";
+ ret.append(field);
break;
}
- case TSDataType::INT32:{
- char buf[111];
- sprintf(buf,"%d",fields[i].intV);
- ret.append(buf);
+ case TSDataType::INT32: {
+ ret.append(std::to_string(fields[i].intV));
break;
}
case TSDataType::INT64: {
- char buf[111];
- sprintf(buf,"%lld",fields[i].longV);
- ret.append(buf);
+ ret.append(std::to_string(fields[i].longV));
break;
}
- case TSDataType::FLOAT:{
- char buf[111];
- sprintf(buf,"%f",fields[i].floatV);
- ret.append(buf);
+ case TSDataType::FLOAT: {
+ ret.append(std::to_string(fields[i].floatV));
break;
}
- case TSDataType::DOUBLE:{
- char buf[111];
- sprintf(buf,"%lf",fields[i].doubleV);
- ret.append(buf);
+ case TSDataType::DOUBLE: {
+ ret.append(std::to_string(fields[i].doubleV));
break;
}
case TSDataType::TEXT: {
ret.append(fields[i].stringV);
break;
}
- case TSDataType::NULLTYPE:{
+ case TSDataType::NULLTYPE: {
ret.append("NULL");
}
}
@@ -499,17 +486,16 @@ public:
}
};
-class SessionDataSet
-{
+class SessionDataSet {
private:
bool hasCachedRecord = false;
std::string sql;
int64_t queryId;
int64_t sessionId;
- std::shared_ptr<TSIServiceIf> client;
+ std::shared_ptr <TSIServiceIf> client;
int batchSize = 1024;
- std::vector<std::string> columnNameList;
- std::vector<std::string> columnTypeDeduplicatedList;
+ std::vector <std::string> columnNameList;
+ std::vector <std::string> columnTypeDeduplicatedList;
// duplicated column index -> origin index
std::map<int, int> duplicateLocation;
// column name -> column location
@@ -518,19 +504,21 @@ private:
int columnSize = 0;
int rowsIndex = 0; // used to record the row index in current
TSQueryDataSet
- std::shared_ptr<TSQueryDataSet> tsQueryDataSet;
+ std::shared_ptr <TSQueryDataSet> tsQueryDataSet;
MyStringBuffer tsQueryDataSetTimeBuffer;
- std::vector<std::unique_ptr<MyStringBuffer>> valueBuffers;
- std::vector<std::unique_ptr<MyStringBuffer>> bitmapBuffers;
+ std::vector <std::unique_ptr<MyStringBuffer>> valueBuffers;
+ std::vector <std::unique_ptr<MyStringBuffer>> bitmapBuffers;
RowRecord rowRecord;
- char* currentBitmap = NULL; // used to cache the current bitmap for every
column
+ char *currentBitmap = NULL; // used to cache the current bitmap for every
column
static const int flag = 0x80; // used to do `or` operation with bitmap to
judge whether the value is null
public:
- SessionDataSet(){}
- SessionDataSet(std::string sql, std::vector<std::string>& columnNameList,
std::vector<std::string>& columnTypeList, int64_t queryId,
- std::shared_ptr<TSIServiceIf> client, int64_t sessionId,
std::shared_ptr<TSQueryDataSet> queryDataSet) :
tsQueryDataSetTimeBuffer(queryDataSet->time)
- {
+ SessionDataSet() {}
+
+ SessionDataSet(std::string sql, std::vector <std::string> &columnNameList,
+ std::vector <std::string> &columnTypeList, int64_t queryId,
+ std::shared_ptr <TSIServiceIf> client, int64_t sessionId,
+ std::shared_ptr <TSQueryDataSet> queryDataSet) :
tsQueryDataSetTimeBuffer(queryDataSet->time) {
this->sessionId = sessionId;
this->sql = sql;
this->queryId = queryId;
@@ -548,8 +536,10 @@ public:
this->columnMap[name] = i;
this->columnTypeDeduplicatedList.push_back(columnTypeList[i]);
}
- this->valueBuffers.push_back(std::unique_ptr<MyStringBuffer>(new
MyStringBuffer(queryDataSet->valueList[i])));
- this->bitmapBuffers.push_back(std::unique_ptr<MyStringBuffer>(new
MyStringBuffer(queryDataSet->bitmapList[i])));
+ this->valueBuffers.push_back(
+ std::unique_ptr<MyStringBuffer>(new
MyStringBuffer(queryDataSet->valueList[i])));
+ this->bitmapBuffers.push_back(
+ std::unique_ptr<MyStringBuffer>(new
MyStringBuffer(queryDataSet->bitmapList[i])));
}
this->tsQueryDataSet = queryDataSet;
}
@@ -562,115 +552,184 @@ public:
}
int getBatchSize();
+
void setBatchSize(int batchSize);
- std::vector<std::string> getColumnNames();
+
+ std::vector <std::string> getColumnNames();
+
bool hasNext();
+
void constructOneRow();
+
bool isNull(int index, int rowNum);
- RowRecord* next();
+
+ RowRecord *next();
+
void closeOperationHandle();
};
template<typename T>
-std::vector<T> sortList(std::vector<T>& valueList, int* index, int
indexLength) {
- std::vector<T> sortedValues(valueList.size());
+std::vector <T> sortList(std::vector <T> &valueList, int *index, int
indexLength) {
+ std::vector <T> sortedValues(valueList.size());
for (int i = 0; i < indexLength; i++) {
sortedValues[i] = valueList[index[i]];
}
return sortedValues;
}
-class Session
-{
- private:
- std::string host;
- int rpcPort;
- std::string username;
- std::string password;
- TSProtocolVersion::type protocolVersion =
TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3;
- std::shared_ptr<TSIServiceIf> client;
- std::shared_ptr<apache::thrift::transport::TSocket> transport;
- bool isClosed = true;
- int64_t sessionId;
- int64_t statementId;
- std::string zoneId;
- int fetchSize;
- const static int DEFAULT_FETCH_SIZE = 10000;
- const static int DEFAULT_TIMEOUT_MS = 0;
-
- bool checkSorted(Tablet& tablet);
- bool checkSorted(std::vector<int64_t>& times);
- void sortTablet(Tablet& tablet);
- void sortIndexByTimestamp(int *index, std::vector<int64_t>&
timestamps, int length);
- std::string getTimeZone();
- void setTimeZone(std::string zoneId);
- void appendValues(std::string &buffer, char* value, int size);
- void putValuesIntoBuffer(std::vector<TSDataType::TSDataType>& types,
std::vector<char*>& values, std::string& buf);
- int8_t getDataTypeNumber(TSDataType::TSDataType type);
- public:
- Session(std::string host, int rpcPort) : username("user"),
password("password") {
- this->host = host;
- this->rpcPort = rpcPort;
- }
+class Session {
+private:
+ std::string host;
+ int rpcPort;
+ std::string username;
+ std::string password;
+ TSProtocolVersion::type protocolVersion =
TSProtocolVersion::IOTDB_SERVICE_PROTOCOL_V3;
+ std::shared_ptr <TSIServiceIf> client;
+ std::shared_ptr <apache::thrift::transport::TSocket> transport;
+ bool isClosed = true;
+ int64_t sessionId;
+ int64_t statementId;
+ std::string zoneId;
+ int fetchSize;
+ const static int DEFAULT_FETCH_SIZE = 10000;
+ const static int DEFAULT_TIMEOUT_MS = 0;
- Session(std::string host, int rpcPort, std::string username,
std::string password)
- : fetchSize(10000) {
- this->host = host;
- this->rpcPort = rpcPort;
- this->username = username;
- this->password = password;
- this->zoneId = "UTC+08:00";
- }
+ bool checkSorted(Tablet &tablet);
- Session(std::string host, int rpcPort, std::string username,
std::string password, int fetchSize) {
- this->host = host;
- this->rpcPort = rpcPort;
- this->username = username;
- this->password = password;
- this->fetchSize = fetchSize;
- this->zoneId = "UTC+08:00";
- }
+ bool checkSorted(std::vector <int64_t> ×);
- Session(std::string host, std::string rpcPort, std::string username =
"user",
- std::string password = "password", int fetchSize = 10000) {
- this->host = host;
- this->rpcPort = stoi(rpcPort);
- this->username = username;
- this->password = password;
- this->fetchSize = fetchSize;
- this->zoneId = "UTC+08:00";
- }
+ void sortTablet(Tablet &tablet);
+
+ void sortIndexByTimestamp(int *index, std::vector <int64_t> ×tamps,
int length);
+
+ std::string getTimeZone();
+
+ void setTimeZone(std::string zoneId);
+
+ void appendValues(std::string &buffer, char *value, int size);
+
+ void
+ putValuesIntoBuffer(std::vector <TSDataType::TSDataType> &types,
std::vector<char *> &values, std::string &buf);
+
+ int8_t getDataTypeNumber(TSDataType::TSDataType type);
+
+public:
+ Session(std::string host, int rpcPort) : username("user"),
password("password") {
+ this->host = host;
+ this->rpcPort = rpcPort;
+ }
+
+ Session(std::string host, int rpcPort, std::string username, std::string
password)
+ : fetchSize(10000) {
+ this->host = host;
+ this->rpcPort = rpcPort;
+ this->username = username;
+ this->password = password;
+ this->zoneId = "UTC+08:00";
+ }
+
+ Session(std::string host, int rpcPort, std::string username, std::string
password, int fetchSize) {
+ this->host = host;
+ this->rpcPort = rpcPort;
+ this->username = username;
+ this->password = password;
+ this->fetchSize = fetchSize;
+ this->zoneId = "UTC+08:00";
+ }
+
+ Session(std::string host, std::string rpcPort, std::string username =
"user",
+ std::string password = "password", int fetchSize = 10000) {
+ this->host = host;
+ this->rpcPort = stoi(rpcPort);
+ this->username = username;
+ this->password = password;
+ this->fetchSize = fetchSize;
+ this->zoneId = "UTC+08:00";
+ }
+
+ void open();
+
+ void open(bool enableRPCCompression);
+
+ void open(bool enableRPCCompression, int connectionTimeoutInMs);
+
+ void close();
+
+ void insertRecord(std::string deviceId, int64_t time, std::vector
<std::string> &measurements,
+ std::vector <std::string> &values);
+
+ void insertRecord(std::string deviceId, int64_t time, std::vector
<std::string> &measurements,
+ std::vector <TSDataType::TSDataType> &types,
std::vector<char *> &values);
+
+ void insertRecords(std::vector <std::string> &deviceIds, std::vector
<int64_t> ×,
+ std::vector <std::vector<std::string>>
&measurementsList,
+ std::vector <std::vector<std::string>> &valuesList);
+
+ void insertRecords(std::vector <std::string> &deviceIds, std::vector
<int64_t> ×,
+ std::vector <std::vector<std::string>>
&measurementsList,
+ std::vector <std::vector<TSDataType::TSDataType>>
typesList,
+ std::vector <std::vector<char *>> &valuesList);
+
+ void insertRecordsOfOneDevice(std::string deviceId, std::vector <int64_t>
×,
+ std::vector <std::vector<std::string>>
measurementsList,
+ std::vector
<std::vector<TSDataType::TSDataType>> typesList,
+ std::vector <std::vector<char *>>
&valuesList);
+
+ void insertRecordsOfOneDevice(std::string deviceId, std::vector <int64_t>
×,
+ std::vector <std::vector<std::string>>
measurementsList,
+ std::vector
<std::vector<TSDataType::TSDataType>> typesList,
+ std::vector <std::vector<char *>>
&valuesList, bool sorted);
+
+ void insertTablet(Tablet &tablet);
+
+ void insertTablet(Tablet &tablet, bool sorted);
+
+ void insertTablets(std::map<std::string, Tablet *> &tablets);
+
+ void insertTablets(std::map<std::string, Tablet *> &tablets, bool sorted);
+
+ void testInsertRecord(std::string deviceId, int64_t time, std::vector
<std::string> &measurements,
+ std::vector <std::string> &values);
+
+ void testInsertTablet(Tablet &tablet);
+
+ void testInsertRecords(std::vector <std::string> &deviceIds, std::vector
<int64_t> ×,
+ std::vector <std::vector<std::string>>
&measurementsList,
+ std::vector <std::vector<std::string>> &valuesList);
+
+ void deleteTimeseries(std::string path);
+
+ void deleteTimeseries(std::vector <std::string> &paths);
+
+ void deleteData(std::string path, int64_t time);
+
+ void deleteData(std::vector <std::string> &deviceId, int64_t time);
+
+ void setStorageGroup(std::string storageGroupId);
+
+ void deleteStorageGroup(std::string storageGroup);
+
+ void deleteStorageGroups(std::vector <std::string> &storageGroups);
+
+ void createTimeseries(std::string path, TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
+ CompressionType::CompressionType compressor);
+
+ void createTimeseries(std::string path, TSDataType::TSDataType dataType,
TSEncoding::TSEncoding encoding,
+ CompressionType::CompressionType compressor,
+ std::map <std::string, std::string> *props, std::map
<std::string, std::string> *tags,
+ std::map <std::string, std::string> *attributes,
std::string measurementAlias);
+
+ void createMultiTimeseries(std::vector <std::string> paths, std::vector
<TSDataType::TSDataType> dataTypes,
+ std::vector <TSEncoding::TSEncoding> encodings,
+ std::vector <CompressionType::CompressionType>
compressors,
+ std::vector <std::map<std::string,
std::string>> *propsList,
+ std::vector <std::map<std::string,
std::string>> *tagsList,
+ std::vector <std::map<std::string,
std::string>> *attributesList,
+ std::vector <std::string>
*measurementAliasList);
+
+ bool checkTimeseriesExists(std::string path);
+
+ std::unique_ptr <SessionDataSet> executeQueryStatement(std::string sql);
- void open();
- void open(bool enableRPCCompression);
- void open(bool enableRPCCompression, int connectionTimeoutInMs);
- void close();
- void insertRecord(std::string deviceId, int64_t time,
std::vector<std::string>& measurements, std::vector<std::string>& values);
- void insertRecord(std::string deviceId, int64_t time,
std::vector<std::string>& measurements, std::vector<TSDataType::TSDataType>&
types, std::vector<char*>& values);
- void insertRecords(std::vector<std::string>& deviceIds,
std::vector<int64_t>& times, std::vector<std::vector<std::string>>&
measurementsList, std::vector<std::vector<std::string>>& valuesList);
- void insertRecords(std::vector<std::string>& deviceIds,
std::vector<int64_t>& times, std::vector<std::vector<std::string>>&
measurementsList, std::vector<std::vector<TSDataType::TSDataType>> typesList,
std::vector<std::vector<char*>>& valuesList);
- void insertRecordsOfOneDevice(std::string deviceId,
std::vector<int64_t>& times, std::vector<std::vector<std::string>>
measurementsList, std::vector<std::vector<TSDataType::TSDataType>> typesList,
std::vector<std::vector<char*>>& valuesList);
- void insertRecordsOfOneDevice(std::string deviceId,
std::vector<int64_t>& times, std::vector<std::vector<std::string>>
measurementsList, std::vector<std::vector<TSDataType::TSDataType>> typesList,
std::vector<std::vector<char*>>& valuesList, bool sorted);
- void insertTablet(Tablet& tablet);
- void insertTablet(Tablet& tablet, bool sorted);
- void insertTablets(std::map<std::string, Tablet*>& tablets);
- void insertTablets(std::map<std::string, Tablet*>& tablets, bool
sorted);
- void testInsertRecord(std::string deviceId, int64_t time,
std::vector<std::string>& measurements, std::vector<std::string>& values);
- void testInsertTablet(Tablet& tablet);
- void testInsertRecords(std::vector<std::string>& deviceIds,
std::vector<int64_t>& times, std::vector<std::vector<std::string>>&
measurementsList, std::vector<std::vector<std::string>>& valuesList);
- void deleteTimeseries(std::string path);
- void deleteTimeseries(std::vector<std::string>& paths);
- void deleteData(std::string path, int64_t time);
- void deleteData(std::vector<std::string>& deviceId, int64_t time);
- void setStorageGroup(std::string storageGroupId);
- void deleteStorageGroup(std::string storageGroup);
- void deleteStorageGroups(std::vector<std::string>& storageGroups);
- void createTimeseries(std::string path, TSDataType::TSDataType
dataType, TSEncoding::TSEncoding encoding, CompressionType::CompressionType
compressor);
- void createTimeseries(std::string path, TSDataType::TSDataType
dataType, TSEncoding::TSEncoding encoding, CompressionType::CompressionType
compressor,
- std::map<std::string, std::string>* props, std::map<std::string,
std::string>* tags, std::map<std::string, std::string>* attributes, std::string
measurementAlias);
- void createMultiTimeseries(std::vector<std::string> paths,
std::vector<TSDataType::TSDataType> dataTypes,
std::vector<TSEncoding::TSEncoding> encodings,
std::vector<CompressionType::CompressionType> compressors,
- std::vector<std::map<std::string, std::string>>* propsList,
std::vector<std::map<std::string, std::string>>* tagsList,
std::vector<std::map<std::string, std::string>>* attributesList,
std::vector<std::string>* measurementAliasList);
- bool checkTimeseriesExists(std::string path);
- std::unique_ptr<SessionDataSet> executeQueryStatement(std::string sql);
- void executeNonQueryStatement(std::string sql);
+ void executeNonQueryStatement(std::string sql);
};