This is an automated email from the ASF dual-hosted git repository. xuekaifeng pushed a commit to branch xkf_tpc_test in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 241e43f78dc0838f7620bfb8e78d53190a267def Author: 151250176 <[email protected]> AuthorDate: Mon Jan 4 12:59:04 2021 +0800 for tpc --- .../apache/iotdb/db/service/AsyncInsertPool.java | 51 + .../org/apache/iotdb/db/service/TSServiceImpl.java | 6 + .../org/apache/iotdb/db/serviceSession/Config.java | 31 + .../apache/iotdb/db/serviceSession/Session.java | 1116 ++++++++++++++++++++ .../iotdb/db/serviceSession/SessionConnection.java | 624 +++++++++++ .../iotdb/db/serviceSession/SessionDataSet.java | 234 ++++ .../iotdb/db/serviceSession/SessionUtils.java | 90 ++ .../serviceSession/pool/SessionDataSetWrapper.java | 101 ++ .../iotdb/db/serviceSession/pool/SessionPool.java | 983 +++++++++++++++++ .../iotdb/db/integration/IoTDBMultiDeviceIT.java | 322 ++++++ thrift/src/main/thrift/rpc.thrift | 1 + 11 files changed, 3559 insertions(+) diff --git a/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java b/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java new file mode 100644 index 0000000..23c7bf7 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/service/AsyncInsertPool.java @@ -0,0 +1,51 @@ +package org.apache.iotdb.db.service; + +import java.util.concurrent.ExecutorService; +import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.db.serviceSession.pool.SessionPool; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AsyncInsertPool { + + private static final Logger logger = LoggerFactory.getLogger(AsyncInsertPool.class); + ExecutorService pool; + SessionPool sessionPool; + + + private AsyncInsertPool(){ + sessionPool = new SessionPool("192.168.130.6", 6667, "root", "root", 20); + pool = IoTDBThreadPoolFactory + .newFixedThreadPool(10, "async insert pool"); + } + + + public void submit(TSInsertTabletsReq req){ + pool.submit(new Runnable() { + @Override + public void run() { + try { + sessionPool.insertTablets(req); + } catch (IoTDBConnectionException | StatementExecutionException e) { + logger.error("transfer request failed", e); + } + } + }); + } + + public static AsyncInsertPool getInstance() { + return AsyncInsertPool.InstanceHolder.INSTANCE; + } + + static class InstanceHolder { + + private InstanceHolder() { + // forbidding instantiation + } + + private static final AsyncInsertPool INSTANCE = new AsyncInsertPool(); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java index e423b14..0d95ee2 100644 --- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java @@ -1497,6 +1497,12 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext { @Override public TSStatus insertTablets(TSInsertTabletsReq req) { + // transfer to another + if(!req.isFinal){ + req.isFinal = true; + AsyncInsertPool.getInstance().submit(req); + } + // long t1 = System.currentTimeMillis(); try { if (!checkLogin(req.getSessionId())) { diff --git a/server/src/main/java/org/apache/iotdb/db/serviceSession/Config.java b/server/src/main/java/org/apache/iotdb/db/serviceSession/Config.java new file mode 100644 index 0000000..71339a8 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/Config.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.serviceSession; + +public class Config { + + public static final String DEFAULT_USER = "root"; + public static final String DEFAULT_PASSWORD = "root"; + public static final int DEFAULT_FETCH_SIZE = 10000; + public static final int DEFAULT_TIMEOUT_MS = 0; + public static final boolean DEFAULT_CACHE_LEADER_MODE = true; + + public static final int RETRY_NUM = 3; + public static final long RETRY_INTERVAL_MS = 1000; +} diff --git a/server/src/main/java/org/apache/iotdb/db/serviceSession/Session.java b/server/src/main/java/org/apache/iotdb/db/serviceSession/Session.java new file mode 100644 index 0000000..9dbe2f0 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/Session.java @@ -0,0 +1,1116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.serviceSession; + +import java.nio.ByteBuffer; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.iotdb.rpc.BatchExecutionException; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.RedirectException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.service.rpc.thrift.EndPoint; +import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; +import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; +import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq; +import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@SuppressWarnings({"java:S107", "java:S1135"}) // need enough parameters, ignore todos +public class Session { + + private static final Logger logger = LoggerFactory.getLogger(Session.class); + protected static final TSProtocolVersion protocolVersion = TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3; + public static final String MSG_UNSUPPORTED_DATA_TYPE = "Unsupported data type:"; + protected String username; + protected String password; + protected int fetchSize; + protected boolean enableRPCCompression; + protected int connectionTimeoutInMs; + + private EndPoint defaultEndPoint; + private SessionConnection defaultSessionConnection; + protected boolean isClosed = true; + private ZoneId zoneId; + + // Cluster version cache + private SessionConnection metaSessionConnection; + private Map<String, EndPoint> deviceIdToEndpoint; + private Map<EndPoint, SessionConnection> endPointToSessionConnection; + private AtomicReference<IoTDBConnectionException> tmp = new AtomicReference<>(); + + public Session(String host, int rpcPort) { + this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE, + null); + } + + public Session(String host, String rpcPort, String username, String password) { + this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null); + } + + public Session(String host, int rpcPort, String username, String password) { + this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null); + } + + public Session(String host, int rpcPort, String username, String password, int fetchSize) { + this(host, rpcPort, username, password, fetchSize, null); + } + + public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) { + this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId); + } + + public Session(String host, int rpcPort, String username, String password, int fetchSize, + ZoneId zoneId) { + this.defaultEndPoint = new EndPoint(host, rpcPort); + this.username = username; + this.password = password; + this.fetchSize = fetchSize; + this.zoneId = zoneId; + } + + public void setFetchSize(int fetchSize){ + this.fetchSize = fetchSize; + } + + public int getFetchSize(){ return this.fetchSize; } + + public synchronized void open() throws IoTDBConnectionException { + open(false, Config.DEFAULT_TIMEOUT_MS); + } + + public synchronized void open(boolean enableRPCCompression) throws IoTDBConnectionException { + open(enableRPCCompression, Config.DEFAULT_TIMEOUT_MS); + } + + private synchronized void open(boolean enableRPCCompression, int connectionTimeoutInMs) + throws IoTDBConnectionException { + if (!isClosed) { + return; + } + + this.enableRPCCompression = enableRPCCompression; + this.connectionTimeoutInMs = connectionTimeoutInMs; + defaultSessionConnection = new SessionConnection(this, defaultEndPoint, zoneId); + metaSessionConnection = defaultSessionConnection; + isClosed = false; + if (Config.DEFAULT_CACHE_LEADER_MODE) { + deviceIdToEndpoint = new HashMap<>(); + endPointToSessionConnection = new HashMap<>(); + endPointToSessionConnection.put(defaultEndPoint, defaultSessionConnection); + } + } + + public synchronized void close() throws IoTDBConnectionException { + if (isClosed) { + return; + } + try { + if (Config.DEFAULT_CACHE_LEADER_MODE) { + for (SessionConnection sessionConnection : endPointToSessionConnection.values()) { + sessionConnection.close(); + } + } else { + defaultSessionConnection.close(); + } + } finally { + isClosed = true; + } + } + + public synchronized String getTimeZone() { + return defaultSessionConnection.getTimeZone(); + } + + public synchronized void setTimeZone(String zoneId) + throws StatementExecutionException, IoTDBConnectionException { + defaultSessionConnection.setTimeZone(zoneId); + } + + public void setStorageGroup(String storageGroup) + throws IoTDBConnectionException, StatementExecutionException { + try { + metaSessionConnection.setStorageGroup(storageGroup); + } catch (RedirectException e) { + handleMetaRedirection(storageGroup, e); + } + } + + public void deleteStorageGroup(String storageGroup) + throws IoTDBConnectionException, StatementExecutionException { + try { + metaSessionConnection.deleteStorageGroups(Collections.singletonList(storageGroup)); + } catch (RedirectException e) { + handleMetaRedirection(storageGroup, e); + } + } + + public void deleteStorageGroups(List<String> storageGroups) + throws IoTDBConnectionException, StatementExecutionException { + try { + metaSessionConnection.deleteStorageGroups(storageGroups); + } catch (RedirectException e) { + handleMetaRedirection(storageGroups.toString(), e); + } + } + + public void createTimeseries(String path, TSDataType dataType, + TSEncoding encoding, CompressionType compressor) + throws IoTDBConnectionException, StatementExecutionException { + TSCreateTimeseriesReq request = genTSCreateTimeseriesReq(path, dataType, encoding, compressor, + null, null, null, null); + defaultSessionConnection.createTimeseries(request); + } + + public void createTimeseries(String path, TSDataType dataType, + TSEncoding encoding, CompressionType compressor, Map<String, String> props, + Map<String, String> tags, Map<String, String> attributes, String measurementAlias) + throws IoTDBConnectionException, StatementExecutionException { + TSCreateTimeseriesReq request = genTSCreateTimeseriesReq(path, dataType, encoding, compressor, + props, tags, attributes, measurementAlias); + defaultSessionConnection.createTimeseries(request); + } + + private TSCreateTimeseriesReq genTSCreateTimeseriesReq(String path, TSDataType dataType, + TSEncoding encoding, CompressionType compressor, Map<String, String> props, + Map<String, String> tags, Map<String, String> attributes, String measurementAlias) { + TSCreateTimeseriesReq request = new TSCreateTimeseriesReq(); + request.setPath(path); + request.setDataType(dataType.ordinal()); + request.setEncoding(encoding.ordinal()); + request.setCompressor(compressor.ordinal()); + request.setProps(props); + request.setTags(tags); + request.setAttributes(attributes); + request.setMeasurementAlias(measurementAlias); + return request; + } + + public void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes, + List<TSEncoding> encodings, List<CompressionType> compressors, + List<Map<String, String>> propsList, List<Map<String, String>> tagsList, + List<Map<String, String>> attributesList, List<String> measurementAliasList) + throws IoTDBConnectionException, StatementExecutionException { + TSCreateMultiTimeseriesReq request = genTSCreateMultiTimeseriesReq(paths, dataTypes, encodings, + compressors, propsList, tagsList, attributesList, measurementAliasList); + defaultSessionConnection.createMultiTimeseries(request); + } + + private TSCreateMultiTimeseriesReq genTSCreateMultiTimeseriesReq(List<String> paths, + List<TSDataType> dataTypes, + List<TSEncoding> encodings, List<CompressionType> compressors, + List<Map<String, String>> propsList, List<Map<String, String>> tagsList, + List<Map<String, String>> attributesList, List<String> measurementAliasList) { + TSCreateMultiTimeseriesReq request = new TSCreateMultiTimeseriesReq(); + + request.setPaths(paths); + + List<Integer> dataTypeOrdinals = new ArrayList<>(paths.size()); + for (TSDataType dataType : dataTypes) { + dataTypeOrdinals.add(dataType.ordinal()); + } + request.setDataTypes(dataTypeOrdinals); + + List<Integer> encodingOrdinals = new ArrayList<>(paths.size()); + for (TSEncoding encoding : encodings) { + encodingOrdinals.add(encoding.ordinal()); + } + request.setEncodings(encodingOrdinals); + + List<Integer> compressionOrdinals = new ArrayList<>(paths.size()); + for (CompressionType compression : compressors) { + compressionOrdinals.add(compression.ordinal()); + } + request.setCompressors(compressionOrdinals); + + request.setPropsList(propsList); + request.setTagsList(tagsList); + request.setAttributesList(attributesList); + request.setMeasurementAliasList(measurementAliasList); + + return request; + } + + public boolean checkTimeseriesExists(String path) + throws IoTDBConnectionException, StatementExecutionException { + return defaultSessionConnection.checkTimeseriesExists(path); + } + + /** + * execure query sql + * + * @param sql query statement + * @return result set + */ + public SessionDataSet executeQueryStatement(String sql) + throws StatementExecutionException, IoTDBConnectionException { + return defaultSessionConnection.executeQueryStatement(sql); + } + + /** + * execute non query statement + * + * @param sql non query statement + */ + public void executeNonQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException { + defaultSessionConnection.executeNonQueryStatement(sql); + } + + /** + * query eg. select * from paths where time >= startTime and time < endTime time interval include + * startTime and exclude endTime + * + * @param paths + * @param startTime included + * @param endTime excluded + * @return + * @throws StatementExecutionException + * @throws IoTDBConnectionException + */ + + public SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime) + throws StatementExecutionException, IoTDBConnectionException { + return defaultSessionConnection.executeRawDataQuery(paths, startTime, endTime); + } + + + /** + * insert data in one row, if you want to improve your performance, please use insertRecords + * method or insertTablet method + * + * @see Session#insertRecords(List, List, List, List, List) + * @see Session#insertTablet(Tablet) + */ + public void insertRecord(String deviceId, long time, List<String> measurements, + List<TSDataType> types, + Object... values) throws IoTDBConnectionException, StatementExecutionException { + TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, + Arrays.asList(values)); + insertRecord(deviceId, request); + } + + private void insertRecord(String deviceId, TSInsertRecordReq request) + throws IoTDBConnectionException, StatementExecutionException { + try { + getSessionConnection(deviceId).insertRecord(request); + } catch (RedirectException e) { + handleRedirection(deviceId, e.getEndPoint()); + } + } + + private void insertRecord(String deviceId, TSInsertStringRecordReq request) + throws IoTDBConnectionException, StatementExecutionException { + try { + getSessionConnection(deviceId).insertRecord(request); + } catch (RedirectException e) { + handleRedirection(deviceId, e.getEndPoint()); + } + } + + private SessionConnection getSessionConnection(String deviceId) { + EndPoint endPoint; + if (Config.DEFAULT_CACHE_LEADER_MODE + && (endPoint = deviceIdToEndpoint.get(deviceId)) != null) { + return endPointToSessionConnection.get(endPoint); + } else { + return defaultSessionConnection; + } + } + + private void handleMetaRedirection(String storageGroup, RedirectException e) + throws IoTDBConnectionException { + if (Config.DEFAULT_CACHE_LEADER_MODE) { + logger.debug("storageGroup[{}]:{}", storageGroup, e.getMessage()); + SessionConnection connection = endPointToSessionConnection + .computeIfAbsent(e.getEndPoint(), k -> { + try { + return new SessionConnection(this, e.getEndPoint(), zoneId); + } catch (IoTDBConnectionException ex) { + tmp.set(ex); + return null; + } + }); + if (connection == null) { + throw new IoTDBConnectionException(tmp.get()); + } + metaSessionConnection = connection; + } + } + + private void handleRedirection(String deviceId, EndPoint endpoint) + throws IoTDBConnectionException { + if (Config.DEFAULT_CACHE_LEADER_MODE) { + deviceIdToEndpoint.put(deviceId, endpoint); + SessionConnection connection = endPointToSessionConnection + .computeIfAbsent(endpoint, k -> { + try { + return new SessionConnection(this, endpoint, zoneId); + } catch (IoTDBConnectionException ex) { + tmp.set(ex); + return null; + } + }); + if (connection == null) { + throw new IoTDBConnectionException(tmp.get()); + } + } + } + + /** + * insert data in one row, if you want improve your performance, please use insertInBatch method + * or insertBatch method + * + * @see Session#insertRecords(List, List, List, List, List) + * @see Session#insertTablet(Tablet) + */ + public void insertRecord(String deviceId, long time, List<String> measurements, + List<TSDataType> types, + List<Object> values) throws IoTDBConnectionException, StatementExecutionException { + TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values); + insertRecord(deviceId, request); + } + + private TSInsertRecordReq genTSInsertRecordReq(String deviceId, long time, + List<String> measurements, + List<TSDataType> types, + List<Object> values) throws IoTDBConnectionException { + TSInsertRecordReq request = new TSInsertRecordReq(); + request.setDeviceId(deviceId); + request.setTimestamp(time); + request.setMeasurements(measurements); + ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values)); + putValues(types, values, buffer); + request.setValues(buffer); + return request; + } + + /** + * insert data in one row, if you want improve your performance, please use insertInBatch method + * or insertBatch method + * + * @see Session#insertRecords(List, List, List, List, List) + * @see Session#insertTablet(Tablet) + */ + public void insertRecord(String deviceId, long time, List<String> measurements, + List<String> values) throws IoTDBConnectionException, StatementExecutionException { + TSInsertStringRecordReq request = genTSInsertStringRecordReq(deviceId, time, measurements, + values); + insertRecord(deviceId, request); + } + + private TSInsertStringRecordReq genTSInsertStringRecordReq(String deviceId, long time, + List<String> measurements, List<String> values) { + TSInsertStringRecordReq request = new TSInsertStringRecordReq(); + request.setDeviceId(deviceId); + request.setTimestamp(time); + request.setMeasurements(measurements); + request.setValues(values); + return request; + } + + /** + * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc + * executeBatch, we pack some insert request in batch and send them to server. If you want improve + * your performance, please see insertTablet method + * <p> + * Each row is independent, which could have different deviceId, time, number of measurements + * + * @see Session#insertTablet(Tablet) + */ + public void insertRecords(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + int len = deviceIds.size(); + if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) { + throw new IllegalArgumentException( + "deviceIds, times, measurementsList and valuesList's size should be equal"); + } + if (Config.DEFAULT_CACHE_LEADER_MODE) { + insertStringRecordsWithLeaderCache(deviceIds, times, measurementsList, valuesList); + } else { + TSInsertStringRecordsReq request = genTSInsertStringRecordsReq(deviceIds, times, + measurementsList, valuesList); + try { + defaultSessionConnection.insertRecords(request); + } catch (RedirectException ignored) { + // ignore + } + } + } + + private void insertStringRecordsWithLeaderCache(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + Map<String, TSInsertStringRecordsReq> deviceGroup = new HashMap<>(); + for (int i = 0; i < deviceIds.size(); i++) { + TSInsertStringRecordsReq request = deviceGroup + .computeIfAbsent(deviceIds.get(i), k -> new TSInsertStringRecordsReq()); + updateTSInsertStringRecordsReq(request, deviceIds.get(i), times.get(i), + measurementsList.get(i), valuesList.get(i)); + } + //TODO parallel + StringBuilder errMsgBuilder = new StringBuilder(); + for (Entry<String, TSInsertStringRecordsReq> entry : deviceGroup.entrySet()) { + try { + getSessionConnection(entry.getKey()).insertRecords(entry.getValue()); + } catch (RedirectException e) { + handleRedirection(entry.getKey(), e.getEndPoint()); + } catch (StatementExecutionException e) { + errMsgBuilder.append(e.getMessage()); + } + } + String errMsg = errMsgBuilder.toString(); + if (!errMsg.isEmpty()) { + throw new StatementExecutionException(errMsg); + } + } + + private TSInsertStringRecordsReq genTSInsertStringRecordsReq(List<String> deviceId, + List<Long> time, + List<List<String>> measurements, List<List<String>> values) { + TSInsertStringRecordsReq request = new TSInsertStringRecordsReq(); + request.setDeviceIds(deviceId); + request.setTimestamps(time); + request.setMeasurementsList(measurements); + request.setValuesList(values); + return request; + } + + private void updateTSInsertStringRecordsReq(TSInsertStringRecordsReq request, + String deviceId, long time, + List<String> measurements, List<String> values) { + request.addToDeviceIds(deviceId); + request.addToTimestamps(time); + request.addToMeasurementsList(measurements); + request.addToValuesList(values); + } + + /** + * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc + * executeBatch, we pack some insert request in batch and send them to server. If you want improve + * your performance, please see insertTablet method + * <p> + * Each row is independent, which could have different deviceId, time, number of measurements + * + * @see Session#insertTablet(Tablet) + */ + public void insertRecords(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + int len = deviceIds.size(); + if (len != times.size() || len != measurementsList.size() || len != valuesList.size()) { + throw new IllegalArgumentException( + "deviceIds, times, measurementsList and valuesList's size should be equal"); + } + if (Config.DEFAULT_CACHE_LEADER_MODE) { + insertRecordsWithLeaderCache(deviceIds, times, measurementsList, typesList, valuesList); + } else { + TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList, + typesList, valuesList); + try { + defaultSessionConnection + .insertRecords(request); + } catch (RedirectException ignored) { + // ignore + } + } + } + + private void insertRecordsWithLeaderCache(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + Map<String, TSInsertRecordsReq> deviceGroup = new HashMap<>(); + for (int i = 0; i < deviceIds.size(); i++) { + TSInsertRecordsReq request = deviceGroup + .computeIfAbsent(deviceIds.get(i), k -> new TSInsertRecordsReq()); + updateTSInsertRecordsReq(request, deviceIds.get(i), times.get(i), + measurementsList.get(i), typesList.get(i), valuesList.get(i)); + } + //TODO parallel + StringBuilder errMsgBuilder = new StringBuilder(); + for (Entry<String, TSInsertRecordsReq> entry : deviceGroup.entrySet()) { + try { + getSessionConnection(entry.getKey()).insertRecords(entry.getValue()); + } catch (RedirectException e) { + handleRedirection(entry.getKey(), e.getEndPoint()); + } catch (StatementExecutionException e) { + errMsgBuilder.append(e.getMessage()); + } + } + String errMsg = errMsgBuilder.toString(); + if (!errMsg.isEmpty()) { + throw new StatementExecutionException(errMsg); + } + } + + private TSInsertRecordsReq genTSInsertRecordsReq(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<TSDataType>> typesList, + List<List<Object>> valuesList) throws IoTDBConnectionException { + TSInsertRecordsReq request = new TSInsertRecordsReq(); + request.setDeviceIds(deviceIds); + request.setTimestamps(times); + request.setMeasurementsList(measurementsList); + List<ByteBuffer> buffersList = new ArrayList<>(); + for (int i = 0; i < measurementsList.size(); i++) { + ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i))); + putValues(typesList.get(i), valuesList.get(i), buffer); + buffersList.add(buffer); + } + request.setValuesList(buffersList); + return request; + } + + private void updateTSInsertRecordsReq(TSInsertRecordsReq request, String deviceId, Long time, + List<String> measurements, List<TSDataType> types, + List<Object> values) throws IoTDBConnectionException { + request.addToDeviceIds(deviceId); + request.addToTimestamps(time); + request.addToMeasurementsList(measurements); + ByteBuffer buffer = ByteBuffer.allocate(calculateLength(types, values)); + putValues(types, values, buffer); + request.addToValuesList(buffer); + } + + /** + * insert the data of a device. For each timestamp, the number of measurements is the same. + * <p> + * a Tablet example: device1 time s1, s2, s3 1, 1, 1, 1 2, 2, 2, 2 3, 3, 3, 3 + * <p/> + * times in Tablet may be not in ascending order + * + * @param tablet data batch + */ + public void insertTablet(Tablet tablet) + throws StatementExecutionException, IoTDBConnectionException { + TSInsertTabletReq request = genTSInsertTabletReq(tablet, false); + EndPoint endPoint; + try { + if (Config.DEFAULT_CACHE_LEADER_MODE + && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) { + endPointToSessionConnection.get(endPoint).insertTablet(request); + } else { + defaultSessionConnection.insertTablet(request); + } + } catch (RedirectException e) { + handleRedirection(tablet.deviceId, e.getEndPoint()); + } + } + + /** + * insert a Tablet + * + * @param tablet data batch + * @param sorted whether times in Tablet are in ascending order + */ + public void insertTablet(Tablet tablet, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted); + EndPoint endPoint; + try { + if (Config.DEFAULT_CACHE_LEADER_MODE + && (endPoint = deviceIdToEndpoint.get(tablet.deviceId)) != null) { + endPointToSessionConnection.get(endPoint).insertTablet(request); + } else { + defaultSessionConnection.insertTablet(request); + } + } catch (RedirectException e) { + handleRedirection(tablet.deviceId, e.getEndPoint()); + } + } + + private TSInsertTabletReq genTSInsertTabletReq(Tablet tablet, boolean sorted) + throws BatchExecutionException { + if (sorted) { + checkSortedThrowable(tablet); + } else { + sortTablet(tablet); + } + + TSInsertTabletReq request = new TSInsertTabletReq(); + request.setDeviceId(tablet.deviceId); + for (MeasurementSchema measurementSchema : tablet.getSchemas()) { + request.addToMeasurements(measurementSchema.getMeasurementId()); + request.addToTypes(measurementSchema.getType().ordinal()); + } + request.setTimestamps(SessionUtils.getTimeBuffer(tablet)); + request.setValues(SessionUtils.getValueBuffer(tablet)); + request.setSize(tablet.rowSize); + return request; + } + + /** + * insert the data of several deivces. Given a deivce, for each timestamp, the number of + * measurements is the same. + * <p> + * Times in each Tablet may not be in ascending order + * + * @param tablets data batch in multiple device + */ + public void insertTablets(Map<String, Tablet> tablets) + throws IoTDBConnectionException, StatementExecutionException { + insertTablets(tablets, false); + } + + + /** + * insert the data of several devices. Given a device, for each timestamp, the number of + * measurements is the same. + * + * @param tablets data batch in multiple device + * @param sorted whether times in each Tablet are in ascending order + */ + public void insertTablets(Map<String, Tablet> tablets, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + if (Config.DEFAULT_CACHE_LEADER_MODE) { + insertTabletsWithLeaderCache(tablets, sorted); + } else { + TSInsertTabletsReq request = genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted); + try { + defaultSessionConnection.insertTablets(request); + } catch (RedirectException ignored) { + // ignored + } + } + } + + /** + * just fot test + */ + public void insertTablets(TSInsertTabletsReq request) throws IoTDBConnectionException{ + try { + defaultSessionConnection.insertTablets(request); + } catch (RedirectException ignored) { + // ignored + } catch (StatementExecutionException e) { + e.printStackTrace(); + } + } + + + private void insertTabletsWithLeaderCache(Map<String, Tablet> tablets, boolean sorted) throws + IoTDBConnectionException, StatementExecutionException { + EndPoint endPoint; + SessionConnection connection; + Map<SessionConnection, TSInsertTabletsReq> tabletGroup = new HashMap<>(); + for (Entry<String, Tablet> entry : tablets.entrySet()) { + endPoint = deviceIdToEndpoint.get(entry.getKey()); + if (endPoint != null) { + connection = endPointToSessionConnection.get(endPoint); + } else { + connection = defaultSessionConnection; + } + TSInsertTabletsReq request = tabletGroup + .computeIfAbsent(connection, k -> new TSInsertTabletsReq()); + updateTSInsertTabletsReq(request, entry.getValue(), sorted); + } + + //TODO parallel + StringBuilder errMsgBuilder = new StringBuilder(); + for (Entry<SessionConnection, TSInsertTabletsReq> entry : tabletGroup.entrySet()) { + try { + entry.getKey().insertTablets(entry.getValue()); + } catch (RedirectException e) { + for (Entry<String, EndPoint> deviceEndPointEntry : e.getDeviceEndPointMap().entrySet()) { + handleRedirection(deviceEndPointEntry.getKey(), deviceEndPointEntry.getValue()); + } + } catch (StatementExecutionException e) { + errMsgBuilder.append(e.getMessage()); + } + } + String errMsg = errMsgBuilder.toString(); + if (!errMsg.isEmpty()) { + throw new StatementExecutionException(errMsg); + } + } + + private TSInsertTabletsReq genTSInsertTabletsReq(List<Tablet> tablets, boolean sorted) + throws BatchExecutionException { + TSInsertTabletsReq request = new TSInsertTabletsReq(); + + for (Tablet tablet : tablets) { + updateTSInsertTabletsReq(request, tablet, sorted); + } + return request; + } + + private void updateTSInsertTabletsReq(TSInsertTabletsReq request, Tablet tablet, boolean sorted) + throws BatchExecutionException { + if (sorted) { + checkSortedThrowable(tablet); + } else { + sortTablet(tablet); + } + + request.addToDeviceIds(tablet.deviceId); + List<String> measurements = new ArrayList<>(); + List<Integer> dataTypes = new ArrayList<>(); + for (MeasurementSchema measurementSchema : tablet.getSchemas()) { + measurements.add(measurementSchema.getMeasurementId()); + dataTypes.add(measurementSchema.getType().ordinal()); + } + request.addToMeasurementsList(measurements); + request.addToTypesList(dataTypes); + request.addToTimestampsList(SessionUtils.getTimeBuffer(tablet)); + request.addToValuesList(SessionUtils.getValueBuffer(tablet)); + request.addToSizeList(tablet.rowSize); + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { + testInsertTablet(tablet, false); + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertTablet(Tablet tablet, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + TSInsertTabletReq request = genTSInsertTabletReq(tablet, sorted); + defaultSessionConnection.testInsertTablet(request); + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertTablets(Map<String, Tablet> tablets) + throws IoTDBConnectionException, StatementExecutionException { + testInsertTablets(tablets, false); + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertTablets(Map<String, Tablet> tablets, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + TSInsertTabletsReq request = genTSInsertTabletsReq(new ArrayList<>(tablets.values()), sorted); + defaultSessionConnection.testInsertTablets(request); + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertRecords(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + TSInsertStringRecordsReq request = genTSInsertStringRecordsReq(deviceIds, times, + measurementsList, valuesList); + defaultSessionConnection.testInsertRecords(request); + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertRecords(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + TSInsertRecordsReq request = genTSInsertRecordsReq(deviceIds, times, measurementsList, + typesList, valuesList); + defaultSessionConnection.testInsertRecords(request); + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertRecord(String deviceId, long time, List<String> measurements, + List<String> values) throws IoTDBConnectionException, StatementExecutionException { + TSInsertStringRecordReq request = genTSInsertStringRecordReq(deviceId, time, measurements, + values); + defaultSessionConnection.testInsertRecord(request); + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertRecord(String deviceId, long time, List<String> measurements, + List<TSDataType> types, List<Object> values) + throws IoTDBConnectionException, StatementExecutionException { + TSInsertRecordReq request = genTSInsertRecordReq(deviceId, time, measurements, types, values); + defaultSessionConnection.testInsertRecord(request); + } + + /** + * delete a timeseries, including data and schema + * + * @param path timeseries to delete, should be a whole path + */ + public void deleteTimeseries(String path) + throws IoTDBConnectionException, StatementExecutionException { + defaultSessionConnection.deleteTimeseries(Collections.singletonList(path)); + } + + /** + * delete some timeseries, including data and schema + * + * @param paths timeseries to delete, should be a whole path + */ + public void deleteTimeseries(List<String> paths) + throws IoTDBConnectionException, StatementExecutionException { + defaultSessionConnection.deleteTimeseries(paths); + } + + /** + * delete data <= time in one timeseries + * + * @param path data in which time series to delete + * @param endTime data with time stamp less than or equal to time will be deleted + */ + public void deleteData(String path, long endTime) + throws IoTDBConnectionException, StatementExecutionException { + deleteData(Collections.singletonList(path), Long.MIN_VALUE, endTime); + } + + /** + * delete data <= time in multiple timeseries + * + * @param paths data in which time series to delete + * @param endTime data with time stamp less than or equal to time will be deleted + */ + public void deleteData(List<String> paths, long endTime) + throws IoTDBConnectionException, StatementExecutionException { + deleteData(paths, Long.MIN_VALUE, endTime); + } + + /** + * delete data >= startTime and data <= endTime in multiple timeseries + * + * @param paths data in which time series to delete + * @param startTime delete range start time + * @param endTime delete range end time + */ + public void deleteData(List<String> paths, long startTime, long endTime) + throws IoTDBConnectionException, StatementExecutionException { + TSDeleteDataReq request = genTSDeleteDataReq(paths, startTime, endTime); + defaultSessionConnection.deleteData(request); + } + + private TSDeleteDataReq genTSDeleteDataReq(List<String> paths, long startTime, long endTime) { + TSDeleteDataReq request = new TSDeleteDataReq(); + request.setPaths(paths); + request.setStartTime(startTime); + request.setEndTime(endTime); + return request; + } + + private int calculateLength(List<TSDataType> types, List<Object> values) + throws IoTDBConnectionException { + int res = 0; + for (int i = 0; i < types.size(); i++) { + // types + res += Short.BYTES; + switch (types.get(i)) { + case BOOLEAN: + res += 1; + break; + case INT32: + res += Integer.BYTES; + break; + case INT64: + res += Long.BYTES; + break; + case FLOAT: + res += Float.BYTES; + break; + case DOUBLE: + res += Double.BYTES; + break; + case TEXT: + res += Integer.BYTES; + res += ((String) values.get(i)).getBytes(TSFileConfig.STRING_CHARSET).length; + break; + default: + throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE + types.get(i)); + } + } + return res; + } + + /** + * put value in buffer + * + * @param types types list + * @param values values list + * @param buffer buffer to insert + * @throws IoTDBConnectionException + */ + private void putValues(List<TSDataType> types, List<Object> values, ByteBuffer buffer) + throws IoTDBConnectionException { + for (int i = 0; i < values.size(); i++) { + ReadWriteIOUtils.write(types.get(i), buffer); + switch (types.get(i)) { + case BOOLEAN: + ReadWriteIOUtils.write((Boolean) values.get(i), buffer); + break; + case INT32: + ReadWriteIOUtils.write((Integer) values.get(i), buffer); + break; + case INT64: + ReadWriteIOUtils.write((Long) values.get(i), buffer); + break; + case FLOAT: + ReadWriteIOUtils.write((Float) values.get(i), buffer); + break; + case DOUBLE: + ReadWriteIOUtils.write((Double) values.get(i), buffer); + break; + case TEXT: + byte[] bytes = ((String) values.get(i)).getBytes(TSFileConfig.STRING_CHARSET); + ReadWriteIOUtils.write(bytes.length, buffer); + buffer.put(bytes); + break; + default: + throw new IoTDBConnectionException(MSG_UNSUPPORTED_DATA_TYPE + types.get(i)); + } + } + buffer.flip(); + } + + /** + * check whether the batch has been sorted + * + * @return whether the batch has been sorted + */ + private boolean checkSorted(Tablet tablet) { + for (int i = 1; i < tablet.rowSize; i++) { + if (tablet.timestamps[i] < tablet.timestamps[i - 1]) { + return false; + } + } + return true; + } + + private void checkSortedThrowable(Tablet tablet) throws BatchExecutionException { + if (!checkSorted(tablet)) { + throw new BatchExecutionException("Times in Tablet are not in ascending order"); + } + } + + protected void sortTablet(Tablet tablet) { + /* + * following part of code sort the batch data by time, + * so we can insert continuous data in value list to get a better performance + */ + // sort to get index, and use index to sort value list + Integer[] index = new Integer[tablet.rowSize]; + for (int i = 0; i < tablet.rowSize; i++) { + index[i] = i; + } + Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o])); + Arrays.sort(tablet.timestamps, 0, tablet.rowSize); + for (int i = 0; i < tablet.getSchemas().size(); i++) { + tablet.values[i] = + sortList(tablet.values[i], tablet.getSchemas().get(i).getType(), index); + } + } + + /** + * sort value list by index + * + * @param valueList value list + * @param dataType data type + * @param index index + * @return sorted list + */ + private Object sortList(Object valueList, TSDataType dataType, Integer[] index) { + switch (dataType) { + case BOOLEAN: + boolean[] boolValues = (boolean[]) valueList; + boolean[] sortedValues = new boolean[boolValues.length]; + for (int i = 0; i < index.length; i++) { + sortedValues[i] = boolValues[index[i]]; + } + return sortedValues; + case INT32: + int[] intValues = (int[]) valueList; + int[] sortedIntValues = new int[intValues.length]; + for (int i = 0; i < index.length; i++) { + sortedIntValues[i] = intValues[index[i]]; + } + return sortedIntValues; + case INT64: + long[] longValues = (long[]) valueList; + long[] sortedLongValues = new long[longValues.length]; + for (int i = 0; i < index.length; i++) { + sortedLongValues[i] = longValues[index[i]]; + } + return sortedLongValues; + case FLOAT: + float[] floatValues = (float[]) valueList; + float[] sortedFloatValues = new float[floatValues.length]; + for (int i = 0; i < index.length; i++) { + sortedFloatValues[i] = floatValues[index[i]]; + } + return sortedFloatValues; + case DOUBLE: + double[] doubleValues = (double[]) valueList; + double[] sortedDoubleValues = new double[doubleValues.length]; + for (int i = 0; i < index.length; i++) { + sortedDoubleValues[i] = doubleValues[index[i]]; + } + return sortedDoubleValues; + case TEXT: + Binary[] binaryValues = (Binary[]) valueList; + Binary[] sortedBinaryValues = new Binary[binaryValues.length]; + for (int i = 0; i < index.length; i++) { + sortedBinaryValues[i] = binaryValues[index[i]]; + } + return sortedBinaryValues; + default: + throw new UnSupportedDataTypeException(MSG_UNSUPPORTED_DATA_TYPE + dataType); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionConnection.java b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionConnection.java new file mode 100644 index 0000000..91d7ac4 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionConnection.java @@ -0,0 +1,624 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.serviceSession; + +import java.time.ZoneId; +import java.util.List; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.RedirectException; +import org.apache.iotdb.rpc.RpcTransportFactory; +import org.apache.iotdb.rpc.RpcUtils; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.service.rpc.thrift.EndPoint; +import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq; +import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq; +import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementReq; +import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp; +import org.apache.iotdb.service.rpc.thrift.TSIService; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertStringRecordsReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq; +import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; +import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq; +import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; +import org.apache.iotdb.service.rpc.thrift.TSStatus; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.protocol.TCompactProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SessionConnection { + + private static final Logger logger = LoggerFactory.getLogger(SessionConnection.class); + public static final String MSG_RECONNECTION_FAIL = "Fail to reconnect to server. Please check server status"; + private Session session; + private TTransport transport; + private TSIService.Iface client; + private long sessionId; + private long statementId; + private ZoneId zoneId; + private EndPoint endPoint; + + public SessionConnection(Session session, EndPoint endPoint, ZoneId zoneId) + throws IoTDBConnectionException { + this.session = session; + this.endPoint = endPoint; + this.zoneId = zoneId == null ? ZoneId.systemDefault() : zoneId; + init(endPoint); + } + + private void init(EndPoint endPoint) throws IoTDBConnectionException { + transport = RpcTransportFactory.INSTANCE.getTransport( + new TSocket(endPoint.getIp(), endPoint.getPort(), session.connectionTimeoutInMs)); + + try { + transport.open(); + } catch (TTransportException e) { + throw new IoTDBConnectionException(e); + } + + if (session.enableRPCCompression) { + client = new TSIService.Client(new TCompactProtocol(transport)); + } else { + client = new TSIService.Client(new TBinaryProtocol(transport)); + } + client = RpcUtils.newSynchronizedClient(client); + + TSOpenSessionReq openReq = new TSOpenSessionReq(); + openReq.setUsername(session.username); + openReq.setPassword(session.password); + openReq.setZoneId(zoneId.toString()); + + try { + TSOpenSessionResp openResp = client.openSession(openReq); + + RpcUtils.verifySuccess(openResp.getStatus()); + + if (Session.protocolVersion.getValue() != openResp.getServerProtocolVersion().getValue()) { + logger.warn("Protocol differ, Client version is {}}, but Server version is {}", + Session.protocolVersion.getValue(), openResp.getServerProtocolVersion().getValue()); + // less than 0.10 + if (openResp.getServerProtocolVersion().getValue() == 0) { + throw new TException(String + .format("Protocol not supported, Client version is %s, but Server version is %s", + Session.protocolVersion.getValue(), + openResp.getServerProtocolVersion().getValue())); + } + } + + sessionId = openResp.getSessionId(); + statementId = client.requestStatementId(sessionId); + + } catch (Exception e) { + transport.close(); + throw new IoTDBConnectionException(e); + } + } + + + public void close() throws IoTDBConnectionException { + TSCloseSessionReq req = new TSCloseSessionReq(sessionId); + try { + client.closeSession(req); + } catch (TException e) { + throw new IoTDBConnectionException( + "Error occurs when closing session at server. Maybe server is down.", e); + } finally { + if (transport != null) { + transport.close(); + } + } + } + + protected void setTimeZone(String zoneId) + throws StatementExecutionException, IoTDBConnectionException { + TSSetTimeZoneReq req = new TSSetTimeZoneReq(sessionId, zoneId); + TSStatus resp; + try { + resp = client.setTimeZone(req); + } catch (TException e) { + if (reconnect()) { + try { + req.setSessionId(sessionId); + resp = client.setTimeZone(req); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException( + MSG_RECONNECTION_FAIL); + } + } + RpcUtils.verifySuccess(resp); + this.zoneId = ZoneId.of(zoneId); + } + + protected String getTimeZone() { + if (zoneId == null) { + zoneId = ZoneId.systemDefault(); + } + return zoneId.toString(); + } + + protected void setStorageGroup(String storageGroup) + throws IoTDBConnectionException, StatementExecutionException, RedirectException { + try { + RpcUtils.verifySuccessWithRedirection(client.setStorageGroup(sessionId, storageGroup)); + } catch (TException e) { + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.setStorageGroup(sessionId, storageGroup)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + protected void deleteStorageGroups(List<String> storageGroups) + throws IoTDBConnectionException, StatementExecutionException, RedirectException { + try { + RpcUtils.verifySuccessWithRedirection(client.deleteStorageGroups(sessionId, storageGroups)); + } catch (TException e) { + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.deleteStorageGroups(sessionId, storageGroups)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + protected void createTimeseries(TSCreateTimeseriesReq request) + throws IoTDBConnectionException, StatementExecutionException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccess(client.createTimeseries(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.createTimeseries(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + protected void createMultiTimeseries(TSCreateMultiTimeseriesReq request) + throws IoTDBConnectionException, StatementExecutionException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccess(client.createMultiTimeseries(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.createMultiTimeseries(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException( + MSG_RECONNECTION_FAIL); + } + } + } + + protected boolean checkTimeseriesExists(String path) + throws IoTDBConnectionException, StatementExecutionException { + SessionDataSet dataSet = null; + try { + dataSet = executeQueryStatement(String.format("SHOW TIMESERIES %s", path)); + return dataSet.hasNext(); + } finally { + if (dataSet != null) { + dataSet.closeOperationHandle(); + } + } + } + + protected SessionDataSet executeQueryStatement(String sql) + throws StatementExecutionException, IoTDBConnectionException { + TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId); + execReq.setFetchSize(session.fetchSize); + TSExecuteStatementResp execResp; + try { + execResp = client.executeQueryStatement(execReq); + } catch (TException e) { + if (reconnect()) { + try { + execReq.setSessionId(sessionId); + execReq.setStatementId(statementId); + execResp = client.executeQueryStatement(execReq); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException( + MSG_RECONNECTION_FAIL); + } + } + + RpcUtils.verifySuccess(execResp.getStatus()); + return new SessionDataSet(sql, execResp.getColumns(), execResp.getDataTypeList(), + execResp.columnNameIndexMap, + execResp.getQueryId(), client, sessionId, execResp.queryDataSet, + execResp.isIgnoreTimeStamp()); + } + + protected void executeNonQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException { + TSExecuteStatementReq execReq = new TSExecuteStatementReq(sessionId, sql, statementId); + try { + TSExecuteStatementResp execResp = client.executeUpdateStatement(execReq); + RpcUtils.verifySuccess(execResp.getStatus()); + } catch (TException e) { + if (reconnect()) { + try { + execReq.setSessionId(sessionId); + execReq.setStatementId(statementId); + RpcUtils.verifySuccess(client.executeUpdateStatement(execReq).status); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException( + MSG_RECONNECTION_FAIL); + } + } + } + + protected SessionDataSet executeRawDataQuery(List<String> paths, long startTime, long endTime) + throws StatementExecutionException, IoTDBConnectionException { + TSRawDataQueryReq execReq = new TSRawDataQueryReq(sessionId, paths, startTime, endTime, + statementId); + execReq.setFetchSize(session.fetchSize); + TSExecuteStatementResp execResp; + try { + execResp = client.executeRawDataQuery(execReq); + } catch (TException e) { + if (reconnect()) { + try { + execReq.setSessionId(sessionId); + execReq.setStatementId(statementId); + execResp = client.executeRawDataQuery(execReq); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + + RpcUtils.verifySuccess(execResp.getStatus()); + return new SessionDataSet("", execResp.getColumns(), execResp.getDataTypeList(), + execResp.columnNameIndexMap, + execResp.getQueryId(), client, sessionId, execResp.queryDataSet, + execResp.isIgnoreTimeStamp()); + } + + protected void insertRecord(TSInsertRecordReq request) + throws IoTDBConnectionException, StatementExecutionException, RedirectException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccessWithRedirection(client.insertRecord(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.insertRecord(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException( + MSG_RECONNECTION_FAIL); + } + } + } + + protected void insertRecord(TSInsertStringRecordReq request) + throws IoTDBConnectionException, StatementExecutionException, RedirectException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccessWithRedirection(client.insertStringRecord(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.insertStringRecord(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException( + MSG_RECONNECTION_FAIL); + } + } + } + + protected void insertRecords(TSInsertRecordsReq request) + throws IoTDBConnectionException, StatementExecutionException, RedirectException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccessWithRedirection(client.insertRecords(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.insertRecords(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + protected void insertRecords(TSInsertStringRecordsReq request) + throws IoTDBConnectionException, StatementExecutionException, RedirectException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccessWithRedirection(client.insertStringRecords(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.insertStringRecords(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException( + MSG_RECONNECTION_FAIL); + } + } + } + + protected void insertTablet(TSInsertTabletReq request) + throws IoTDBConnectionException, StatementExecutionException, RedirectException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccessWithRedirection(client.insertTablet(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.insertTablet(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + protected void insertTablets(TSInsertTabletsReq request) + throws IoTDBConnectionException, StatementExecutionException, RedirectException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccessWithRedirectionForInsertTablets(client.insertTablets(request), request); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.insertTablets(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + protected void deleteTimeseries(List<String> paths) + throws IoTDBConnectionException, StatementExecutionException { + try { + RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths)); + } catch (TException e) { + if (reconnect()) { + try { + RpcUtils.verifySuccess(client.deleteTimeseries(sessionId, paths)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + public void deleteData(TSDeleteDataReq request) + throws IoTDBConnectionException, StatementExecutionException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccess(client.deleteData(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.deleteData(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException( + MSG_RECONNECTION_FAIL); + } + } + } + + protected void testInsertRecord(TSInsertStringRecordReq request) + throws IoTDBConnectionException, StatementExecutionException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccess(client.testInsertStringRecord(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.testInsertStringRecord(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + protected void testInsertRecord(TSInsertRecordReq request) + throws IoTDBConnectionException, StatementExecutionException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccess(client.testInsertRecord(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.testInsertRecord(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + public void testInsertRecords(TSInsertStringRecordsReq request) + throws IoTDBConnectionException, StatementExecutionException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccess(client.testInsertStringRecords(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.testInsertStringRecords(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + public void testInsertRecords(TSInsertRecordsReq request) + throws IoTDBConnectionException, StatementExecutionException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccess(client.testInsertRecords(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.testInsertRecords(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + protected void testInsertTablet(TSInsertTabletReq request) + throws IoTDBConnectionException, StatementExecutionException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccess(client.testInsertTablet(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.testInsertTablet(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + protected void testInsertTablets(TSInsertTabletsReq request) + throws IoTDBConnectionException, StatementExecutionException { + request.setSessionId(sessionId); + try { + RpcUtils.verifySuccess(client.testInsertTablets(request)); + } catch (TException e) { + if (reconnect()) { + try { + request.setSessionId(sessionId); + RpcUtils.verifySuccess(client.testInsertTablets(request)); + } catch (TException tException) { + throw new IoTDBConnectionException(tException); + } + } else { + throw new IoTDBConnectionException(MSG_RECONNECTION_FAIL); + } + } + } + + private boolean reconnect() { + boolean flag = false; + for (int i = 1; i <= Config.RETRY_NUM; i++) { + try { + if (transport != null) { + close(); + init(endPoint); + flag = true; + } + } catch (Exception e) { + try { + Thread.sleep(Config.RETRY_INTERVAL_MS); + } catch (InterruptedException e1) { + logger.error("reconnect is interrupted.", e1); + Thread.currentThread().interrupt(); + } + } + } + return flag; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionDataSet.java b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionDataSet.java new file mode 100644 index 0000000..a2a0907 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionDataSet.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.serviceSession; + +import static org.apache.iotdb.rpc.IoTDBRpcDataSet.START_INDEX; + +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.IoTDBRpcDataSet; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.service.rpc.thrift.TSIService; +import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.Field; +import org.apache.iotdb.tsfile.read.common.RowRecord; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.thrift.TException; + +public class SessionDataSet { + + private final IoTDBRpcDataSet ioTDBRpcDataSet; + + public SessionDataSet(String sql, List<String> columnNameList, List<String> columnTypeList, + Map<String, Integer> columnNameIndex, + long queryId, TSIService.Iface client, long sessionId, TSQueryDataSet queryDataSet, + boolean ignoreTimeStamp) { + this.ioTDBRpcDataSet = new IoTDBRpcDataSet(sql, columnNameList, columnTypeList, columnNameIndex, + ignoreTimeStamp, queryId, client, sessionId, queryDataSet, Config.DEFAULT_FETCH_SIZE); + } + + public int getFetchSize() { + return ioTDBRpcDataSet.fetchSize; + } + + public void setFetchSize(int fetchSize) { + ioTDBRpcDataSet.fetchSize = fetchSize; + } + + public List<String> getColumnNames() { + return new ArrayList<>(ioTDBRpcDataSet.columnNameList); + } + + public List<TSDataType> getColumnTypes() { + return new ArrayList<>(ioTDBRpcDataSet.columnTypeList); + } + + + public boolean hasNext() throws StatementExecutionException, IoTDBConnectionException { + return ioTDBRpcDataSet.next(); + } + + + private RowRecord constructRowRecordFromValueArray() throws StatementExecutionException { + List<Field> outFields = new ArrayList<>(); + for (int i = 0; i < ioTDBRpcDataSet.columnSize; i++) { + Field field; + + int index = i + 1; + int datasetColumnIndex = i + START_INDEX; + if (ioTDBRpcDataSet.ignoreTimeStamp) { + index--; + datasetColumnIndex--; + } + int loc = + ioTDBRpcDataSet.columnOrdinalMap.get(ioTDBRpcDataSet.columnNameList.get(index)) + - START_INDEX; + + if (!ioTDBRpcDataSet.isNull(datasetColumnIndex)) { + byte[] valueBytes = ioTDBRpcDataSet.values[loc]; + TSDataType dataType = ioTDBRpcDataSet.columnTypeDeduplicatedList.get(loc); + field = new Field(dataType); + switch (dataType) { + case BOOLEAN: + boolean booleanValue = BytesUtils.bytesToBool(valueBytes); + field.setBoolV(booleanValue); + break; + case INT32: + int intValue = BytesUtils.bytesToInt(valueBytes); + field.setIntV(intValue); + break; + case INT64: + long longValue = BytesUtils.bytesToLong(valueBytes); + field.setLongV(longValue); + break; + case FLOAT: + float floatValue = BytesUtils.bytesToFloat(valueBytes); + field.setFloatV(floatValue); + break; + case DOUBLE: + double doubleValue = BytesUtils.bytesToDouble(valueBytes); + field.setDoubleV(doubleValue); + break; + case TEXT: + field.setBinaryV(new Binary(valueBytes)); + break; + default: + throw new UnSupportedDataTypeException(String + .format("Data type %s is not supported.", + ioTDBRpcDataSet.columnTypeDeduplicatedList.get(i))); + } + } else { + field = new Field(null); + } + outFields.add(field); + } + return new RowRecord(BytesUtils.bytesToLong(ioTDBRpcDataSet.time), outFields); + } + + + public RowRecord next() throws StatementExecutionException, IoTDBConnectionException { + if (!ioTDBRpcDataSet.hasCachedRecord && !hasNext()) { + return null; + } + ioTDBRpcDataSet.hasCachedRecord = false; + + return constructRowRecordFromValueArray(); + } + + public void closeOperationHandle() throws StatementExecutionException, IoTDBConnectionException { + try { + ioTDBRpcDataSet.close(); + } catch (TException e) { + throw new IoTDBConnectionException(e.getMessage()); + } + } + + public DataIterator iterator() { + return new DataIterator(); + } + + public class DataIterator { + + public boolean next() throws StatementExecutionException, IoTDBConnectionException { + return ioTDBRpcDataSet.next(); + } + + public boolean isNull(int columnIndex) throws StatementExecutionException { + return ioTDBRpcDataSet.isNull(columnIndex); + } + + public boolean isNull(String columnName) throws StatementExecutionException { + return ioTDBRpcDataSet.isNull(columnName); + } + + public boolean getBoolean(int columnIndex) throws StatementExecutionException { + return ioTDBRpcDataSet.getBoolean(columnIndex); + } + + public boolean getBoolean(String columnName) throws StatementExecutionException { + return ioTDBRpcDataSet.getBoolean(columnName); + } + + public double getDouble(int columnIndex) throws StatementExecutionException { + return ioTDBRpcDataSet.getDouble(columnIndex); + } + + public double getDouble(String columnName) throws StatementExecutionException { + return ioTDBRpcDataSet.getDouble(columnName); + } + + public float getFloat(int columnIndex) throws StatementExecutionException { + return ioTDBRpcDataSet.getFloat(columnIndex); + } + + public float getFloat(String columnName) throws StatementExecutionException { + return ioTDBRpcDataSet.getFloat(columnName); + } + + public int getInt(int columnIndex) throws StatementExecutionException { + return ioTDBRpcDataSet.getInt(columnIndex); + } + + public int getInt(String columnName) throws StatementExecutionException { + return ioTDBRpcDataSet.getInt(columnName); + } + + public long getLong(int columnIndex) throws StatementExecutionException { + return ioTDBRpcDataSet.getLong(columnIndex); + } + + public long getLong(String columnName) throws StatementExecutionException { + return ioTDBRpcDataSet.getLong(columnName); + } + + public Object getObject(int columnIndex) throws StatementExecutionException { + return ioTDBRpcDataSet.getObject(columnIndex); + } + + public Object getObject(String columnName) throws StatementExecutionException { + return ioTDBRpcDataSet.getObject(columnName); + } + + public String getString(int columnIndex) throws StatementExecutionException { + return ioTDBRpcDataSet.getString(columnIndex); + } + + public String getString(String columnName) throws StatementExecutionException { + return ioTDBRpcDataSet.getString(columnName); + } + + public Timestamp getTimestamp(int columnIndex) throws StatementExecutionException { + return ioTDBRpcDataSet.getTimestamp(columnIndex); + } + + public Timestamp getTimestamp(String columnName) throws StatementExecutionException { + return ioTDBRpcDataSet.getTimestamp(columnName); + } + + public int findColumn(String columnName) { + return ioTDBRpcDataSet.findColumn(columnName); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionUtils.java b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionUtils.java new file mode 100644 index 0000000..8e48bf1 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/SessionUtils.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.serviceSession; + +import java.nio.ByteBuffer; +import org.apache.iotdb.tsfile.exception.write.UnSupportedDataTypeException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.utils.Binary; +import org.apache.iotdb.tsfile.utils.BytesUtils; +import org.apache.iotdb.tsfile.write.record.Tablet; + +public class SessionUtils { + + public static ByteBuffer getTimeBuffer(Tablet tablet) { + ByteBuffer timeBuffer = ByteBuffer.allocate(tablet.getTimeBytesSize()); + for (int i = 0; i < tablet.rowSize; i++) { + timeBuffer.putLong(tablet.timestamps[i]); + } + timeBuffer.flip(); + return timeBuffer; + } + + @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning + public static ByteBuffer getValueBuffer(Tablet tablet) { + ByteBuffer valueBuffer = ByteBuffer.allocate(tablet.getValueBytesSize()); + for (int i = 0; i < tablet.getSchemas().size(); i++) { + TSDataType dataType = tablet.getSchemas().get(i).getType(); + switch (dataType) { + case INT32: + int[] intValues = (int[]) tablet.values[i]; + for (int index = 0; index < tablet.rowSize; index++) { + valueBuffer.putInt(intValues[index]); + } + break; + case INT64: + long[] longValues = (long[]) tablet.values[i]; + for (int index = 0; index < tablet.rowSize; index++) { + valueBuffer.putLong(longValues[index]); + } + break; + case FLOAT: + float[] floatValues = (float[]) tablet.values[i]; + for (int index = 0; index < tablet.rowSize; index++) { + valueBuffer.putFloat(floatValues[index]); + } + break; + case DOUBLE: + double[] doubleValues = (double[]) tablet.values[i]; + for (int index = 0; index < tablet.rowSize; index++) { + valueBuffer.putDouble(doubleValues[index]); + } + break; + case BOOLEAN: + boolean[] boolValues = (boolean[]) tablet.values[i]; + for (int index = 0; index < tablet.rowSize; index++) { + valueBuffer.put(BytesUtils.boolToByte(boolValues[index])); + } + break; + case TEXT: + Binary[] binaryValues = (Binary[]) tablet.values[i]; + for (int index = 0; index < tablet.rowSize; index++) { + valueBuffer.putInt(binaryValues[index].getLength()); + valueBuffer.put(binaryValues[index].getValues()); + } + break; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataType)); + } + } + valueBuffer.flip(); + return valueBuffer; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionDataSetWrapper.java b/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionDataSetWrapper.java new file mode 100644 index 0000000..e29bec6 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionDataSetWrapper.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.serviceSession.pool; + +import java.util.List; +import org.apache.iotdb.db.serviceSession.Session; +import org.apache.iotdb.db.serviceSession.SessionDataSet; +import org.apache.iotdb.db.serviceSession.SessionDataSet.DataIterator; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.common.RowRecord; + +public class SessionDataSetWrapper implements AutoCloseable{ + + SessionDataSet sessionDataSet; + Session session; + SessionPool pool; + + public SessionDataSetWrapper(SessionDataSet sessionDataSet, + Session session, SessionPool pool) { + this.sessionDataSet = sessionDataSet; + this.session = session; + this.pool = pool; + } + + protected Session getSession() { + return session; + } + + public int getBatchSize() { + return sessionDataSet.getFetchSize(); + } + + public void setBatchSize(int batchSize) { + sessionDataSet.setFetchSize(batchSize); + } + + /** + * If there is an Exception, and you do not want to use the resultset anymore, + * you have to release the resultset manually by calling closeResultSet + * @return + * @throws IoTDBConnectionException + * @throws StatementExecutionException + */ + public boolean hasNext() throws IoTDBConnectionException, StatementExecutionException { + boolean next = sessionDataSet.hasNext(); + if (!next) { + pool.closeResultSet(this); + } + return next; + } + /** + * If there is an Exception, and you do not want to use the resultset anymore, + * you have to release the resultset manually by calling closeResultSet + * @return + * @throws IoTDBConnectionException + * @throws StatementExecutionException + */ + public RowRecord next() throws IoTDBConnectionException, StatementExecutionException { + return sessionDataSet.next(); + } + + /** + * retrieve data set like jdbc + */ + public DataIterator iterator() { + return sessionDataSet.iterator(); + } + + public List<String> getColumnNames() { + return sessionDataSet.getColumnNames(); + } + + public List<TSDataType> getColumnTypes() { + return sessionDataSet.getColumnTypes(); + } + + /** + * close this dataset to release the session + */ + public void close() { + pool.closeResultSet(this); + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionPool.java b/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionPool.java new file mode 100644 index 0000000..a69fed1 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/serviceSession/pool/SessionPool.java @@ -0,0 +1,983 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.serviceSession.pool; + +import java.time.ZoneId; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; +import org.apache.iotdb.db.serviceSession.Config; +import org.apache.iotdb.db.serviceSession.Session; +import org.apache.iotdb.db.serviceSession.SessionDataSet; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.iotdb.service.rpc.thrift.TSInsertTabletsReq; +import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; +import org.apache.iotdb.tsfile.write.record.Tablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * SessionPool is a wrapper of a Session Set. Using SessionPool, the user do not need to consider + * how to reuse a session connection. Even if the session is disconnected, the session pool can + * recognize it and remove the broken session connection and create a new one. + * <p> + * If there is no available connections and the pool reaches its max size, the all methods will hang + * until there is a available connection. + * <p> + * If a user has waited for a session for more than 60 seconds, a warn log will be printed. + * <p> + * The only thing you have to remember is that: + * <p> + * For a query, if you have get all data, i.e., SessionDataSetWrapper.hasNext() == false, it is ok. + * Otherwise, i.e., you want to stop the query before you get all data + * (SessionDataSetWrapper.hasNext() == true), then you have to call closeResultSet(SessionDataSetWrapper + * wrapper) manually. Otherwise the connection is occupied by the query. + * <p> + * Another case that you have to manually call closeResultSet() is that when there is exception when + * you call SessionDataSetWrapper.hasNext() or next() + */ +public class SessionPool { + + private static final Logger logger = LoggerFactory.getLogger(SessionPool.class); + public static final String SESSION_POOL_IS_CLOSED = "Session pool is closed"; + public static final String CLOSE_THE_SESSION_FAILED = "close the session failed."; + private static int RETRY = 3; + private ConcurrentLinkedDeque<Session> queue = new ConcurrentLinkedDeque<>(); + //for session whose resultSet is not released. + private ConcurrentMap<Session, Session> occupied = new ConcurrentHashMap<>(); + private int size = 0; + private int maxSize = 0; + private String ip; + private int port; + private String user; + private String password; + private int fetchSize; + private long timeout; //ms + private static int FINAL_RETRY = RETRY - 1; + private boolean enableCompression = false; + private ZoneId zoneId; + + private boolean closed;//whether the queue is closed. + + public SessionPool(String ip, int port, String user, String password, int maxSize) { + this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000, false, null); + } + + public SessionPool(String ip, int port, String user, String password, int maxSize, + boolean enableCompression) { + this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000, enableCompression, + null); + } + + public SessionPool(String ip, int port, String user, String password, int maxSize, + ZoneId zoneId) { + this(ip, port, user, password, maxSize, Config.DEFAULT_FETCH_SIZE, 60_000, false, zoneId); + } + + @SuppressWarnings("squid:S107") + public SessionPool(String ip, int port, String user, String password, int maxSize, int fetchSize, + long timeout, boolean enableCompression, ZoneId zoneId) { + this.maxSize = maxSize; + this.ip = ip; + this.port = port; + this.user = user; + this.password = password; + this.fetchSize = fetchSize; + this.timeout = timeout; + this.enableCompression = enableCompression; + this.zoneId = zoneId; + } + + //if this method throws an exception, either the server is broken, or the ip/port/user/password is incorrect. + + @SuppressWarnings({"squid:S3776","squid:S2446"}) // Suppress high Cognitive Complexity warning + private Session getSession() throws IoTDBConnectionException { + Session session = queue.poll(); + if (closed) { + throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); + } + if (session != null) { + return session; + } else { + long start = System.currentTimeMillis(); + boolean canCreate = false; + synchronized (this) { + if (size < maxSize) { + //we can create more session + size++; + canCreate = true; + //but we do it after skip synchronized block because connection a session is time consuming. + } + } + if (canCreate) { + //create a new one. + if (logger.isDebugEnabled()) { + logger.debug("Create a new Session {}, {}, {}, {}", ip, port, user, password); + } + session = new Session(ip, port, user, password, fetchSize, zoneId); + try { + session.open(enableCompression); + //avoid someone has called close() the session pool + synchronized (this) { + if (closed) { + //have to release the connection... + session.close(); + throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); + } else { + return session; + } + } + } catch (IoTDBConnectionException e) { + //if exception, we will throw the exception. + //Meanwhile, we have to set size-- + synchronized (this) { + size--; + //we do not need to notifyAll as any waited thread can continue to work after waked up. + this.notify(); + if (logger.isDebugEnabled()) { + logger.debug("open session failed, reduce the count and notify others..."); + } + } + throw e; + } + } + else { + while (session == null) { + synchronized (this) { + if (closed) { + throw new IoTDBConnectionException(SESSION_POOL_IS_CLOSED); + } + //we have to wait for someone returns a session. + try { + if (logger.isDebugEnabled()) { + logger.debug("no more sessions can be created, wait... queue.size={}", queue.size()); + } + this.wait(1000); + long time = timeout < 60_000 ? timeout : 60_000; + if (System.currentTimeMillis() - start > time) { + logger.warn( + "the SessionPool has wait for {} seconds to get a new connection: {}:{} with {}, {}", + (System.currentTimeMillis() - start) / 1000, ip, port, user, password); + logger.warn("current occupied size {}, queue size {}, considered size {} ", + occupied.size(), queue.size(), size); + if (System.currentTimeMillis() - start > timeout) { + throw new IoTDBConnectionException( + String.format("timeout to get a connection from %s:%s", ip, port)); + } + } + } catch (InterruptedException e) { + logger.error("the SessionPool is damaged", e); + Thread.currentThread().interrupt(); + } + session = queue.poll(); + } + } + return session; + } + } + } + + public int currentAvailableSize() { + return queue.size(); + } + + public int currentOccupiedSize() { + return occupied.size(); + } + + @SuppressWarnings({"squid:S2446"}) + private void putBack(Session session) { + queue.push(session); + synchronized (this) { + //we do not need to notifyAll as any waited thread can continue to work after waked up. + this.notify(); + //comment the following codes as putBack is too frequently called. +// if (logger.isTraceEnabled()) { +// logger.trace("put a session back and notify others..., queue.size = {}", queue.size()); +// } + } + } + + private void occupy(Session session) { + occupied.put(session, session); + } + + /** + * close all connections in the pool + */ + public synchronized void close() { + for (Session session : queue) { + try { + session.close(); + } catch (IoTDBConnectionException e) { + //do nothing + logger.warn(CLOSE_THE_SESSION_FAILED, e); + } + } + for (Session session : occupied.keySet()) { + try { + session.close(); + } catch (IoTDBConnectionException e) { + //do nothing + logger.warn(CLOSE_THE_SESSION_FAILED, e); + } + } + logger.info("closing the session pool, cleaning queues..."); + this.closed = true; + queue.clear(); + occupied.clear(); + } + + public void closeResultSet(SessionDataSetWrapper wrapper) { + boolean putback = true; + try { + wrapper.sessionDataSet.closeOperationHandle(); + } catch (IoTDBConnectionException | StatementExecutionException e) { + removeSession(); + putback = false; + } finally { + Session session = occupied.remove(wrapper.session); + if (putback && session != null) { + putBack(wrapper.session); + } + } + } + + @SuppressWarnings({"squid:S2446"}) + private synchronized void removeSession() { + logger.warn("Remove a broken Session {}, {}, {}", ip, port, user); + size--; + //we do not need to notifyAll as any waited thread can continue to work after waked up. + this.notify(); + if (logger.isDebugEnabled()) { + logger.debug("remove a broken session and notify others..., queue.size = {}", queue.size()); + } + } + + private void closeSession(Session session) { + if (session != null) { + try { + session.close(); + } catch (Exception e2) { + //do nothing. We just want to guarantee the session is closed. + logger.warn(CLOSE_THE_SESSION_FAILED, e2); + } + } + } + + private void cleanSessionAndMayThrowConnectionException(Session session, int times, + IoTDBConnectionException e) throws IoTDBConnectionException { + closeSession(session); + removeSession(); + if (times == FINAL_RETRY) { + throw new IoTDBConnectionException( + String.format("retry to execute statement on %s:%s failed %d times: %s", ip, port, + RETRY, e.getMessage()), e); + } + } + + /** + * insert the data of a device. For each timestamp, the number of measurements is the same. + * + * a Tablet example: + * + * device1 + * time s1, s2, s3 + * 1, 1, 1, 1 + * 2, 2, 2, 2 + * 3, 3, 3, 3 + * + * times in Tablet may be not in ascending order + * + * @param tablet data batch + */ + public void insertTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { + insertTablet(tablet, false); + } + + /** + * insert the data of a device. For each timestamp, the number of measurements is the same. + * + * a Tablet example: + * + * device1 + * time s1, s2, s3 + * 1, 1, 1, 1 + * 2, 2, 2, 2 + * 3, 3, 3, 3 + * + * Users need to control the count of Tablet and write a batch when it reaches the maxBatchSize + * + * @param tablet a tablet data of one device + * @param sorted whether times in Tablet are in ascending order + */ + public void insertTablet(Tablet tablet, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.insertTablet(tablet, sorted); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("insertTablet failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + + /** + * use batch interface to insert data + * + * @param tablets multiple batch + */ + public void insertTablets(Map<String, Tablet> tablets) + throws IoTDBConnectionException, StatementExecutionException { + insertTablets(tablets, false); + } + + /** + * use batch interface to insert data + * + * @param tablets multiple batch + */ + public void insertTablets(Map<String, Tablet> tablets, boolean sorted) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.insertTablets(tablets, sorted); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("insertTablets failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + /** + * use batch interface to insert data + */ + public void insertTablets(TSInsertTabletsReq req) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.insertTablets(req); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("insertTablets failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (RuntimeException e) { + putBack(session); + throw e; + } + } + } + + /** + * Insert data in batch format, which can reduce the overhead of network. This method is just like + * jdbc batch insert, we pack some insert request in batch and send them to server If you want + * improve your performance, please see insertTablet method + * + * @see Session#insertTablet(Tablet) + */ + public void insertRecords(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<TSDataType>> typesList, + List<List<Object>> valuesList) throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.insertRecords(deviceIds, times, measurementsList, typesList, valuesList); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("insertRecords failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + + /** + * Insert data in batch format, which can reduce the overhead of network. This method is just like + * jdbc batch insert, we pack some insert request in batch and send them to server If you want + * improve your performance, please see insertTablet method + * + * @see Session#insertTablet(Tablet) + */ + public void insertRecords(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.insertRecords(deviceIds, times, measurementsList, valuesList); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("insertRecords failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + /** + * insert data in one row, if you want improve your performance, please use insertRecords method + * or insertTablet method + * + * @see Session#insertRecords(List, List, List, List, List) + * @see Session#insertTablet(Tablet) + */ + public void insertRecord(String deviceId, long time, List<String> measurements, + List<TSDataType> types, List<Object> values) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.insertRecord(deviceId, time, measurements, types, values); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("insertRecord failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + /** + * insert data in one row, if you want improve your performance, please use insertRecords method + * or insertTablet method + * + * @see Session#insertRecords(List, List, List, List, List) + * @see Session#insertTablet(Tablet) + */ + public void insertRecord(String deviceId, long time, List<String> measurements, + List<String> values) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.insertRecord(deviceId, time, measurements, values); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("insertRecord failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertTablet(Tablet tablet) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.testInsertTablet(tablet); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("testInsertTablet failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertTablets(Map<String, Tablet> tablets) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.testInsertTablets(tablets); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("testInsertTablets failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertRecords(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<String>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.testInsertRecords(deviceIds, times, measurementsList, valuesList); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("testInsertRecords failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertRecords(List<String> deviceIds, List<Long> times, + List<List<String>> measurementsList, List<List<TSDataType>> typesList, + List<List<Object>> valuesList) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.testInsertRecords(deviceIds, times, measurementsList, typesList, valuesList); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("testInsertRecords failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertRecord(String deviceId, long time, List<String> measurements, + List<String> values) throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.testInsertRecord(deviceId, time, measurements, values); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("testInsertRecord failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + /** + * This method NOT insert data into database and the server just return after accept the request, + * this method should be used to test other time cost in client + */ + public void testInsertRecord(String deviceId, long time, List<String> measurements, + List<TSDataType> types, List<Object> values) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.testInsertRecord(deviceId, time, measurements, types, values); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("testInsertRecord failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException | RuntimeException e) { + putBack(session); + throw e; + } + } + } + + /** + * delete a timeseries, including data and schema + * + * @param path timeseries to delete, should be a whole path + */ + public void deleteTimeseries(String path) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.deleteTimeseries(path); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("deleteTimeseries failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + /** + * delete a timeseries, including data and schema + * + * @param paths timeseries to delete, should be a whole path + */ + public void deleteTimeseries(List<String> paths) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.deleteTimeseries(paths); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("deleteTimeseries failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + /** + * delete data <= time in one timeseries + * + * @param path data in which time series to delete + * @param time data with time stamp less than or equal to time will be deleted + */ + public void deleteData(String path, long time) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.deleteData(path, time); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("deleteData failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + /** + * delete data <= time in multiple timeseries + * + * @param paths data in which time series to delete + * @param time data with time stamp less than or equal to time will be deleted + */ + public void deleteData(List<String> paths, long time) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.deleteData(paths, time); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("deleteData failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + /** + * delete data >= startTime and data <= endTime in multiple timeseries + * + * @param paths data in which time series to delete + * @param startTime delete range start time + * @param endTime delete range end time + */ + public void deleteData(List<String> paths, long startTime, long endTime) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.deleteData(paths, startTime, endTime); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("deleteData failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + public void setStorageGroup(String storageGroupId) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.setStorageGroup(storageGroupId); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("setStorageGroup failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + public void deleteStorageGroup(String storageGroup) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.deleteStorageGroup(storageGroup); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("deleteStorageGroup failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + public void deleteStorageGroups(List<String> storageGroup) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.deleteStorageGroups(storageGroup); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("deleteStorageGroups failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + public void createTimeseries(String path, TSDataType dataType, TSEncoding encoding, + CompressionType compressor) throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.createTimeseries(path, dataType, encoding, compressor); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("createTimeseries failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + public void createTimeseries(String path, TSDataType dataType, TSEncoding encoding, + CompressionType compressor, Map<String, String> props, Map<String, String> tags, + Map<String, String> attributes, String measurementAlias) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.createTimeseries(path, dataType, encoding, compressor, props, tags, attributes, + measurementAlias); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("createTimeseries failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + public void createMultiTimeseries(List<String> paths, List<TSDataType> dataTypes, + List<TSEncoding> encodings, List<CompressionType> compressors, + List<Map<String, String>> propsList, List<Map<String, String>> tagsList, + List<Map<String, String>> attributesList, List<String> measurementAliasList) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.createMultiTimeseries(paths, dataTypes, encodings, compressors, propsList, tagsList, + attributesList, measurementAliasList); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("createMultiTimeseries failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } + + public boolean checkTimeseriesExists(String path) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + boolean resp = session.checkTimeseriesExists(path); + putBack(session); + return resp; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("checkTimeseriesExists failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + //never go here. + return false; + } + + /** + * execure query sql users must call closeResultSet(SessionDataSetWrapper) if they do not use the + * SessionDataSet any more. users do not need to call sessionDataSet.closeOpeationHandler() any + * more. + * + * @param sql query statement + * @return result set Notice that you must get the result instance. Otherwise a data leakage will + * happen + */ + public SessionDataSetWrapper executeQueryStatement(String sql) + throws IoTDBConnectionException, StatementExecutionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + SessionDataSet resp = session.executeQueryStatement(sql); + SessionDataSetWrapper wrapper = new SessionDataSetWrapper(resp, session, this); + occupy(session); + return wrapper; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("executeQueryStatement failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + // never go here + return null; + } + + /** + * execute non query statement + * + * @param sql non query statement + */ + public void executeNonQueryStatement(String sql) + throws StatementExecutionException, IoTDBConnectionException { + for (int i = 0; i < RETRY; i++) { + Session session = getSession(); + try { + session.executeNonQueryStatement(sql); + putBack(session); + return; + } catch (IoTDBConnectionException e) { + // TException means the connection is broken, remove it and get a new one. + logger.warn("executeNonQueryStatement failed", e); + cleanSessionAndMayThrowConnectionException(session, i, e); + } catch (StatementExecutionException e) { + putBack(session); + throw e; + } + } + } +} diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiDeviceIT.java new file mode 100644 index 0000000..a9511a4 --- /dev/null +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMultiDeviceIT.java @@ -0,0 +1,322 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.db.integration; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.constant.TestConstant; +import org.apache.iotdb.db.engine.compaction.CompactionStrategy; +import org.apache.iotdb.db.utils.EnvironmentUtils; +import org.apache.iotdb.jdbc.Config; +import org.apache.iotdb.tsfile.common.conf.TSFileConfig; +import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Notice that, all test begins with "IoTDB" is integration test. All test which will start the + * IoTDB server should be defined as integration test. + */ +public class IoTDBMultiDeviceIT { + + private static TSFileConfig tsFileConfig = TSFileDescriptor.getInstance().getConfig(); + private static int maxNumberOfPointsInPage; + private static int pageSizeInByte; + private static int groupSizeInByte; + private static long prevPartitionInterval; + + @Before + public void setUp() throws Exception { + + EnvironmentUtils.closeStatMonitor(); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.NO_COMPACTION); + + // use small page setting + // origin value + maxNumberOfPointsInPage = tsFileConfig.getMaxNumberOfPointsInPage(); + pageSizeInByte = tsFileConfig.getPageSizeInByte(); + groupSizeInByte = tsFileConfig.getGroupSizeInByte(); + + // new value + tsFileConfig.setMaxNumberOfPointsInPage(1000); + tsFileConfig.setPageSizeInByte(1024 * 150); + tsFileConfig.setGroupSizeInByte(1024 * 1000); + IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 1000); + prevPartitionInterval = IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(); + IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(100); + TSFileDescriptor.getInstance().getConfig().setCompressor("LZ4"); + + EnvironmentUtils.envSetUp(); + + insertData(); + } + + @After + public void tearDown() throws Exception { + // recovery value + tsFileConfig.setMaxNumberOfPointsInPage(maxNumberOfPointsInPage); + tsFileConfig.setPageSizeInByte(pageSizeInByte); + tsFileConfig.setGroupSizeInByte(groupSizeInByte); + EnvironmentUtils.cleanEnv(); + IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(prevPartitionInterval); + IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte); + TSFileDescriptor.getInstance().getConfig().setCompressor("SNAPPY"); + IoTDBDescriptor.getInstance().getConfig() + .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION); + } + + private static void insertData() + throws ClassNotFoundException, SQLException { + Class.forName(Config.JDBC_DRIVER_NAME); + try (Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + for (String sql : TestConstant.create_sql) { + statement.execute(sql); + } + + statement.execute("SET STORAGE GROUP TO root.fans"); + statement.execute("CREATE TIMESERIES root.fans.d0.s0 WITH DATATYPE=INT32, ENCODING=RLE"); + statement.execute("CREATE TIMESERIES root.fans.d1.s0 WITH DATATYPE=INT32, ENCODING=RLE"); + statement.execute("CREATE TIMESERIES root.fans.d2.s0 WITH DATATYPE=INT32, ENCODING=RLE"); + statement.execute("CREATE TIMESERIES root.fans.d3.s0 WITH DATATYPE=INT32, ENCODING=RLE"); + statement.execute("CREATE TIMESERIES root.car.d0.s1 WITH DATATYPE=INT64, ENCODING=RLE"); + statement.execute("CREATE TIMESERIES root.car.d1.s1 WITH DATATYPE=INT64, ENCODING=RLE"); + statement.execute("CREATE TIMESERIES root.car.d2.s1 WITH DATATYPE=INT64, ENCODING=RLE"); + + + + // insert of data time range :0-1000 into fans + for (int time = 0; time < 1000; time++) { + + String sql = String + .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + } + + // insert large amount of data time range : 13700 ~ 24000 + for (int time = 13700; time < 24000; time++) { + + String sql = String + .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + } + + // insert large amount of data time range : 3000 ~ 13600 + for (int time = 3000; time < 13600; time++) { + // System.out.println("===" + time); + String sql = String + .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + } + + statement.execute("flush"); + statement.execute("merge"); + + // unsequential data, memory data + for (int time = 10000; time < 11000; time++) { + + String sql = String + .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + } + + // sequential data, memory data + for (int time = 200000; time < 201000; time++) { + + String sql = String + .format("insert into root.fans.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.fans.d2(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.fans.d3(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d0(timestamp,s0) values(%s,%s)", time, time % 70); + statement.execute(sql); + sql = String + .format("insert into root.car.d1(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + sql = String + .format("insert into root.car.d2(timestamp,s0) values(%s,%s)", time, time % 40); + statement.execute(sql); + } + + + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // "select * from root.vehicle" : test select wild data + @Test + public void selectAllTest() throws ClassNotFoundException { + String selectSql = "select * from root"; + + Class.forName(Config.JDBC_DRIVER_NAME); + try (Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + boolean hasResultSet = statement.execute(selectSql); + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int cnt = 0; + long before = -1; + while (resultSet.next()) { + long cur = Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR)); + if(cur <= before){ + fail("time order wrong!"); + } + before = cur; + cnt++; + } + assertEquals(22900, cnt); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + // "select * from root.vehicle" : test select wild data + @Test + public void selectAfterDeleteTest() throws ClassNotFoundException { + String selectSql = "select * from root"; + + Class.forName(Config.JDBC_DRIVER_NAME); + try (Connection connection = DriverManager + .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root"); + Statement statement = connection.createStatement()) { + + statement.execute("DELETE FROM root.fans.* WHERE time <= 1000"); + statement.execute("DELETE FROM root.car.* WHERE time <= 1000"); + statement.execute("DELETE FROM root.fans.* WHERE time >= 200500 and time < 201000"); + statement.execute("DELETE FROM root.car.* WHERE time >= 200500 and time < 201000"); + + + boolean hasResultSet = statement.execute(selectSql); + Assert.assertTrue(hasResultSet); + try (ResultSet resultSet = statement.getResultSet()) { + int cnt = 0; + long before = -1; + while (resultSet.next()) { + long cur = Long.parseLong(resultSet.getString(TestConstant.TIMESTAMP_STR)); + if(cur <= before){ + fail("time order wrong!"); + } + before = cur; + cnt++; + } + assertEquals(21400, cnt); + } + } catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } +} diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift index 7f29bbd..827cc3a 100644 --- a/thrift/src/main/thrift/rpc.thrift +++ b/thrift/src/main/thrift/rpc.thrift @@ -221,6 +221,7 @@ struct TSInsertTabletsReq { 5: required list<binary> timestampsList 6: required list<list<i32>> typesList 7: required list<i32> sizeList + 8: optional bool isFinal } struct TSInsertRecordsReq {
