This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch IOTDB-3773 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bc31698ac8116b4e0312e0fa6340cfb01d4e609e Author: JackieTien97 <[email protected]> AuthorDate: Wed Nov 2 15:16:46 2022 +0800 Save one rpc call --- .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java | 72 +++++++------- .../org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java | 12 +-- .../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 5 +- .../service/thrift/impl/ClientRPCServiceImpl.java | 105 +++++++++++++++------ .../apache/iotdb/db/utils/QueryDataSetUtils.java | 14 ++- .../java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java | 15 +-- .../apache/iotdb/session/SessionConnection.java | 9 +- .../org/apache/iotdb/session/SessionDataSet.java | 10 +- thrift/src/main/thrift/client.thrift | 2 + 9 files changed, 154 insertions(+), 90 deletions(-) diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java index f900c32c99..2977eb60dc 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java @@ -606,12 +606,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -651,12 +651,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -706,7 +706,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -952,7 +952,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -1061,7 +1061,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -1116,12 +1116,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -1230,12 +1230,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -1330,7 +1330,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -1399,7 +1399,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -1452,12 +1452,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -1501,12 +1501,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -1705,7 +1705,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -1761,12 +1761,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -1806,12 +1806,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -1952,7 +1952,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -2006,12 +2006,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -2047,12 +2047,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -2171,7 +2171,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -2211,7 +2211,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -2383,7 +2383,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -2560,7 +2560,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -2763,7 +2763,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { true, client, null, - 0, + -1, sessionId, Collections.singletonList(tsBlock), null, @@ -2806,12 +2806,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override @@ -2860,12 +2860,12 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData { false, client, null, - 0, + -1, sessionId, null, null, (long) 60 * 1000, - true); + false); } @Override diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java index 1222ad128e..8fc39b31e3 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBJDBCResultSet.java @@ -60,8 +60,6 @@ public class IoTDBJDBCResultSet implements ResultSet { protected List<String> columnTypeList; protected IoTDBRpcDataSet ioTDBRpcDataSet; protected IoTDBTracingInfo ioTDBRpcTracingInfo; - private boolean isRpcFetchResult = true; - private String operationType = ""; private List<String> columns = null; private List<String> sgColumns = null; @@ -82,7 +80,8 @@ public class IoTDBJDBCResultSet implements ResultSet { String operationType, List<String> columns, List<String> sgColumns, - BitSet aliasColumnMap) + BitSet aliasColumnMap, + boolean moreData) throws SQLException { this.ioTDBRpcDataSet = new IoTDBRpcDataSet( @@ -91,7 +90,7 @@ public class IoTDBJDBCResultSet implements ResultSet { columnTypeList, columnNameIndex, ignoreTimeStamp, - true, + moreData, queryId, ((IoTDBStatement) statement).getStmtId(), client, @@ -125,7 +124,7 @@ public class IoTDBJDBCResultSet implements ResultSet { List<ByteBuffer> dataSet, TSTracingInfo tracingInfo, long timeout, - boolean isRpcFetchResult) + boolean moreData) throws SQLException { this.ioTDBRpcDataSet = new IoTDBRpcDataSet( @@ -134,7 +133,7 @@ public class IoTDBJDBCResultSet implements ResultSet { columnTypeList, columnNameIndex, ignoreTimeStamp, - isRpcFetchResult, + moreData, queryId, ((IoTDBStatement) statement).getStmtId(), client, @@ -144,7 +143,6 @@ public class IoTDBJDBCResultSet implements ResultSet { timeout); this.statement = statement; this.columnTypeList = columnTypeList; - this.isRpcFetchResult = isRpcFetchResult; if (tracingInfo != null) { ioTDBRpcTracingInfo = new IoTDBTracingInfo(); ioTDBRpcTracingInfo.setTsTracingInfo(tracingInfo); diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java index 3fb925e46b..b3633436ef 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBStatement.java @@ -302,7 +302,7 @@ public class IoTDBStatement implements Statement { execResp.queryResult, execResp.tracingInfo, execReq.timeout, - true); + execResp.moreData); } return true; } @@ -457,7 +457,8 @@ public class IoTDBStatement implements Statement { execResp.operationType, execResp.columns, execResp.sgColumns, - aliasColumn); + aliasColumn, + execResp.moreData); } return resultSet; } diff --git a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java index 0b8526bad2..2efdf6a85a 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/thrift/impl/ClientRPCServiceImpl.java @@ -112,6 +112,7 @@ import org.apache.iotdb.service.rpc.thrift.TSyncIdentityInfo; import org.apache.iotdb.service.rpc.thrift.TSyncTransportMetaInfo; import org.apache.iotdb.tsfile.read.common.block.TsBlock; import org.apache.iotdb.tsfile.read.common.block.column.Column; +import org.apache.iotdb.tsfile.utils.Pair; import org.apache.thrift.TException; import org.slf4j.Logger; @@ -149,19 +150,25 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @FunctionalInterface public interface SelectResult { - public void apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) + boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize) throws IoTDBException, IOException; } private static final SelectResult SELECT_RESULT = - (resp, queryExecution, fetchSize) -> - resp.setQueryResult( - QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize)); + (resp, queryExecution, fetchSize) -> { + Pair<List<ByteBuffer>, Boolean> pair = + QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, fetchSize); + resp.setQueryResult(pair.left); + return pair.right; + }; private static final SelectResult OLD_SELECT_RESULT = - (resp, queryExecution, fetchSize) -> - resp.setQueryDataSet( - QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize)); + (resp, queryExecution, fetchSize) -> { + Pair<TSQueryDataSet, Boolean> pair = + QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, fetchSize); + resp.setQueryDataSet(pair.left); + return pair.right; + }; public ClientRPCServiceImpl() { if (config.isClusterMode()) { @@ -175,6 +182,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { private TSExecuteStatementResp executeStatementInternal( TSExecuteStatementReq req, SelectResult setResult) { + boolean finished = false; + long queryId = Long.MIN_VALUE; String statement = req.getStatement(); if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) { return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); @@ -200,8 +209,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { QUERY_FREQUENCY_RECORDER.incrementAndGet(); AUDIT_LOGGER.debug("Session {} execute Query: {}", req.sessionId, statement); - long queryId = - SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId); + queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId); // create and cache dataset ExecutionResult result = COORDINATOR.execute( @@ -225,22 +233,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (queryExecution != null && queryExecution.isQuery()) { resp = createResponse(queryExecution.getDatasetHeader(), queryId); resp.setStatus(result.status); - setResult.apply(resp, queryExecution, req.fetchSize); + finished = setResult.apply(resp, queryExecution, req.fetchSize); + resp.setMoreData(!finished); } else { resp = RpcUtils.getTSExecuteStatementResp(result.status); } return resp; } } catch (Exception e) { + finished = true; return RpcUtils.getTSExecuteStatementResp( onQueryException(e, "\"" + statement + "\". " + OperationType.EXECUTE_STATEMENT)); } finally { addOperationLatency(Operation.EXECUTE_QUERY, startTime); + if (finished) { + COORDINATOR.cleanupQueryExecution(queryId); + } } } private TSExecuteStatementResp executeRawDataQueryInternal( TSRawDataQueryReq req, SelectResult setResult) { + boolean finished = false; + long queryId = Long.MIN_VALUE; if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) { return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); } @@ -257,8 +272,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { QUERY_FREQUENCY_RECORDER.incrementAndGet(); AUDIT_LOGGER.debug("Session {} execute Raw Data Query: {}", req.sessionId, req); - long queryId = - SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId); + queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId); // create and cache dataset ExecutionResult result = COORDINATOR.execute( @@ -281,23 +295,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (queryExecution.isQuery()) { resp = createResponse(queryExecution.getDatasetHeader(), queryId); resp.setStatus(result.status); - setResult.apply(resp, queryExecution, req.fetchSize); + finished = setResult.apply(resp, queryExecution, req.fetchSize); + resp.setMoreData(!finished); } else { resp = RpcUtils.getTSExecuteStatementResp(result.status); } return resp; } } catch (Exception e) { - // TODO call the coordinator to release query resource + finished = true; return RpcUtils.getTSExecuteStatementResp( onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_RAW_DATA_QUERY)); } finally { addOperationLatency(Operation.EXECUTE_QUERY, startTime); + if (finished) { + COORDINATOR.cleanupQueryExecution(queryId); + } } } private TSExecuteStatementResp executeLastDataQueryInternal( TSLastDataQueryReq req, SelectResult setResult) { + boolean finished = false; + long queryId = Long.MIN_VALUE; if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) { return RpcUtils.getTSExecuteStatementResp(getNotLoggedInStatus()); } @@ -312,8 +332,7 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } QUERY_FREQUENCY_RECORDER.incrementAndGet(); AUDIT_LOGGER.debug("Session {} execute Last Data Query: {}", req.sessionId, req); - long queryId = - SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId); + queryId = SESSION_MANAGER.requestQueryId(SESSION_MANAGER.getCurrSession(), req.statementId); // create and cache dataset ExecutionResult result = COORDINATOR.execute( @@ -336,7 +355,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { if (queryExecution.isQuery()) { resp = createResponse(queryExecution.getDatasetHeader(), queryId); resp.setStatus(result.status); - setResult.apply(resp, queryExecution, req.fetchSize); + finished = setResult.apply(resp, queryExecution, req.fetchSize); + resp.setMoreData(!finished); } else { resp = RpcUtils.getTSExecuteStatementResp(result.status); } @@ -344,11 +364,14 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { } } catch (Exception e) { - // TODO call the coordinator to release query resource + finished = true; return RpcUtils.getTSExecuteStatementResp( onQueryException(e, "\"" + req + "\". " + OperationType.EXECUTE_LAST_DATA_QUERY)); } finally { addOperationLatency(Operation.EXECUTE_QUERY, startTime); + if (finished) { + COORDINATOR.cleanupQueryExecution(queryId); + } } } @@ -379,6 +402,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) { + long startTime = System.currentTimeMillis(); + boolean finished = false; try { if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) { return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); @@ -386,20 +411,33 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId); + + if (queryExecution == null) { + resp.setHasResultSet(false); + resp.setMoreData(true); + return resp; + } + try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { - List<ByteBuffer> result = + Pair<List<ByteBuffer>, Boolean> pair = QueryDataSetUtils.convertQueryResultByFetchSize(queryExecution, req.fetchSize); + List<ByteBuffer> result = pair.left; + finished = pair.right; boolean hasResultSet = !(result.size() == 0); resp.setHasResultSet(hasResultSet); resp.setIsAlign(true); resp.setQueryResult(result); - if (!hasResultSet) { - COORDINATOR.cleanupQueryExecution(req.queryId); - } + resp.setMoreData(!finished); return resp; } } catch (Exception e) { + finished = true; return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS)); + } finally { + addOperationLatency(Operation.EXECUTE_QUERY, startTime); + if (finished) { + COORDINATOR.cleanupQueryExecution(req.queryId); + } } } @@ -857,6 +895,8 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSFetchResultsResp fetchResults(TSFetchResultsReq req) { + boolean finished = false; + long startTime = System.currentTimeMillis(); try { if (!SESSION_MANAGER.checkLogin(SESSION_MANAGER.getCurrSession())) { return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus()); @@ -865,21 +905,32 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS); IQueryExecution queryExecution = COORDINATOR.getQueryExecution(req.queryId); + if (queryExecution == null) { + resp.setHasResultSet(false); + resp.setMoreData(true); + return resp; + } + try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) { - TSQueryDataSet result = + Pair<TSQueryDataSet, Boolean> pair = QueryDataSetUtils.convertTsBlockByFetchSize(queryExecution, req.fetchSize); + TSQueryDataSet result = pair.left; + finished = pair.right; boolean hasResultSet = result.bufferForTime().limit() != 0; - resp.setHasResultSet(hasResultSet); resp.setQueryDataSet(result); resp.setIsAlign(true); - if (!hasResultSet) { - COORDINATOR.cleanupQueryExecution(req.queryId); - } + resp.setMoreData(finished); return resp; } } catch (Exception e) { + finished = true; return RpcUtils.getTSFetchResultsResp(onQueryException(e, OperationType.FETCH_RESULTS)); + } finally { + addOperationLatency(Operation.EXECUTE_QUERY, startTime); + if (finished) { + COORDINATOR.cleanupQueryExecution(req.queryId); + } } } diff --git a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java index ca603bf535..ff3cda054a 100644 --- a/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java +++ b/server/src/main/java/org/apache/iotdb/db/utils/QueryDataSetUtils.java @@ -32,6 +32,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet; import org.apache.iotdb.tsfile.utils.Binary; import org.apache.iotdb.tsfile.utils.BitMap; import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.iotdb.tsfile.utils.Pair; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; @@ -179,8 +180,9 @@ public class QueryDataSetUtils { return tsQueryDataSet; } - public static TSQueryDataSet convertTsBlockByFetchSize( + public static Pair<TSQueryDataSet, Boolean> convertTsBlockByFetchSize( IQueryExecution queryExecution, int fetchSize) throws IOException, IoTDBException { + boolean finished = false; int columnNum = queryExecution.getOutputValueColumnCount(); TSQueryDataSet tsQueryDataSet = new TSQueryDataSet(); // one time column and each value column has an actual value buffer and a bitmap value to @@ -201,6 +203,7 @@ public class QueryDataSetUtils { while (rowCount < fetchSize) { Optional<TsBlock> optionalTsBlock = queryExecution.getBatchResult(); if (!optionalTsBlock.isPresent()) { + finished = true; break; } TsBlock tsBlock = optionalTsBlock.get(); @@ -371,17 +374,20 @@ public class QueryDataSetUtils { } tsQueryDataSet.setBitmapList(bitmapList); tsQueryDataSet.setValueList(valueList); - return tsQueryDataSet; + return new Pair<>(tsQueryDataSet, finished); } + /** pair.left is serialized TsBlock pair.right indicates if the query finished */ // To fetch required amounts of data and combine them through List - public static List<ByteBuffer> convertQueryResultByFetchSize( + public static Pair<List<ByteBuffer>, Boolean> convertQueryResultByFetchSize( IQueryExecution queryExecution, int fetchSize) throws IoTDBException { int rowCount = 0; List<ByteBuffer> res = new ArrayList<>(); + boolean finished = false; while (rowCount < fetchSize) { Optional<ByteBuffer> optionalByteBuffer = queryExecution.getByteBufferBatchResult(); if (!optionalByteBuffer.isPresent()) { + finished = true; break; } ByteBuffer byteBuffer = optionalByteBuffer.get(); @@ -397,7 +403,7 @@ public class QueryDataSetUtils { } rowCount += positionCount; } - return res; + return new Pair<>(res, finished); } public static long[] readTimesFromBuffer(ByteBuffer buffer, int size) { diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java index 3077c571be..98a116b008 100644 --- a/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java +++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/IoTDBRpcDataSet.java @@ -64,7 +64,8 @@ public class IoTDBRpcDataSet { public long queryId; public long statementId; public boolean ignoreTimeStamp; - public boolean isRpcFetchResult; + // indicates that there is still more data in server side and we can call fetchResult to get more + public boolean moreData; public static final TsBlockSerde serde = new TsBlockSerde(); public List<ByteBuffer> queryResult; @@ -81,7 +82,7 @@ public class IoTDBRpcDataSet { List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp, - boolean isRpcFetchResult, + boolean moreData, long queryId, long statementId, IClientRPCService.Iface client, @@ -97,7 +98,7 @@ public class IoTDBRpcDataSet { this.client = client; this.fetchSize = fetchSize; this.timeout = timeout; - this.isRpcFetchResult = isRpcFetchResult; + this.moreData = moreData; columnSize = columnNameList.size(); this.columnNameList = new ArrayList<>(); @@ -162,7 +163,7 @@ public class IoTDBRpcDataSet { List<String> columnTypeList, Map<String, Integer> columnNameIndex, boolean ignoreTimeStamp, - boolean isRpcFetchResult, + boolean moreData, long queryId, long statementId, IClientRPCService.Iface client, @@ -180,7 +181,7 @@ public class IoTDBRpcDataSet { this.client = client; this.fetchSize = fetchSize; this.timeout = timeout; - this.isRpcFetchResult = isRpcFetchResult; + this.moreData = moreData; columnSize = columnNameList.size(); this.columnNameList = new ArrayList<>(); @@ -289,7 +290,7 @@ public class IoTDBRpcDataSet { "Cannot close dataset, because of network connection: {} ", e); } } - if (isRpcFetchResult && fetchResults() && hasCachedByteBuffer()) { + if (moreData && fetchResults() && hasCachedByteBuffer()) { constructOneTsBlock(); constructOneRow(); return true; @@ -309,8 +310,8 @@ public class IoTDBRpcDataSet { req.setTimeout(timeout); try { TSFetchResultsResp resp = client.fetchResultsV2(req); - RpcUtils.verifySuccess(resp.getStatus()); + moreData = resp.moreData; if (!resp.hasResultSet) { emptyResultSet = true; close(); diff --git a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java index 630f6ffdeb..bd2ca927bc 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionConnection.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionConnection.java @@ -377,7 +377,8 @@ public class SessionConnection { sessionId, execResp.queryResult, execResp.isIgnoreTimeStamp(), - timeout); + timeout, + execResp.moreData); } protected void executeNonQueryStatement(String sql) @@ -439,7 +440,8 @@ public class SessionConnection { client, sessionId, execResp.queryResult, - execResp.isIgnoreTimeStamp()); + execResp.isIgnoreTimeStamp(), + execResp.moreData); } protected SessionDataSet executeLastDataQuery(List<String> paths, long time, long timeOut) @@ -478,7 +480,8 @@ public class SessionConnection { client, sessionId, tsExecuteStatementResp.queryResult, - tsExecuteStatementResp.isIgnoreTimeStamp()); + tsExecuteStatementResp.isIgnoreTimeStamp(), + tsExecuteStatementResp.moreData); } protected void insertRecord(TSInsertRecordReq request) diff --git a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java index f7b725ab73..c07336bb36 100644 --- a/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java +++ b/session/src/main/java/org/apache/iotdb/session/SessionDataSet.java @@ -51,7 +51,8 @@ public class SessionDataSet implements AutoCloseable { IClientRPCService.Iface client, long sessionId, List<ByteBuffer> queryResult, - boolean ignoreTimeStamp) { + boolean ignoreTimeStamp, + boolean moreData) { this.ioTDBRpcDataSet = new IoTDBRpcDataSet( sql, @@ -59,7 +60,7 @@ public class SessionDataSet implements AutoCloseable { columnTypeList, columnNameIndex, ignoreTimeStamp, - true, + moreData, queryId, statementId, client, @@ -80,7 +81,8 @@ public class SessionDataSet implements AutoCloseable { long sessionId, List<ByteBuffer> queryResult, boolean ignoreTimeStamp, - long timeout) { + long timeout, + boolean moreData) { this.ioTDBRpcDataSet = new IoTDBRpcDataSet( sql, @@ -88,7 +90,7 @@ public class SessionDataSet implements AutoCloseable { columnTypeList, columnNameIndex, ignoreTimeStamp, - true, + moreData, queryId, statementId, client, diff --git a/thrift/src/main/thrift/client.thrift b/thrift/src/main/thrift/client.thrift index 61744021c8..0cccfe9ee2 100644 --- a/thrift/src/main/thrift/client.thrift +++ b/thrift/src/main/thrift/client.thrift @@ -68,6 +68,7 @@ struct TSExecuteStatementResp { 11: optional list<byte> aliasColumns 12: optional TSTracingInfo tracingInfo 13: optional list<binary> queryResult + 14: optional bool moreData } enum TSProtocolVersion { @@ -176,6 +177,7 @@ struct TSFetchResultsResp{ 4: optional TSQueryDataSet queryDataSet 5: optional TSQueryNonAlignDataSet nonAlignQueryDataSet 6: optional list<binary> queryResult + 7: optional bool moreData } struct TSFetchMetadataResp{
