This is an automated email from the ASF dual-hosted git repository. sunzesong pushed a commit to branch jira-1103 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9baba06efc1f5b7838735721909e01dbcdab5ec2 Author: samperson1997 <[email protected]> AuthorDate: Tue Jan 5 16:55:25 2021 +0800 [IOTDB-1103] Fix frame size larger than max length error --- .../main/java/org/apache/iotdb/jdbc/Config.java | 9 ++++ .../org/apache/iotdb/jdbc/IoTDBConnection.java | 24 ++++++---- .../apache/iotdb/jdbc/IoTDBConnectionParams.java | 10 ++++ .../iotdb/db/sync/sender/transfer/SyncClient.java | 43 ++++++++--------- .../main/java/org/apache/iotdb/session/Config.java | 10 ++++ .../java/org/apache/iotdb/session/Session.java | 54 +++++++++++++++------- 6 files changed, 103 insertions(+), 47 deletions(-) diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java index 9025607..457f732 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/Config.java @@ -54,4 +54,13 @@ public class Config { public static boolean rpcThriftCompressionEnable = false; + /** + * thrift init buffer size, 1KB by default + */ + public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024; + + /** + * thrift max frame size (16384000 bytes by default), we change it to 64MB + */ + public static final int DEFAULT_MAX_FRAME_SIZE = 67108864; } diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java index 85464cd..5a0578d 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnection.java @@ -40,7 +40,14 @@ import java.util.Properties; import java.util.concurrent.Executor; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.StatementExecutionException; -import org.apache.iotdb.service.rpc.thrift.*; +import org.apache.iotdb.service.rpc.thrift.ServerProperties; +import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSIService; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionReq; +import org.apache.iotdb.service.rpc.thrift.TSOpenSessionResp; +import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion; +import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq; +import org.apache.iotdb.service.rpc.thrift.TSStatus; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TCompactProtocol; @@ -78,10 +85,9 @@ public class IoTDBConnection implements Connection { params = Utils.parseUrl(url, info); openTransport(); - if(Config.rpcThriftCompressionEnable) { + if (Config.rpcThriftCompressionEnable) { setClient(new TSIService.Client(new TCompactProtocol(transport))); - } - else { + } else { setClient(new TSIService.Client(new TBinaryProtocol(transport))); } // open client session @@ -120,7 +126,8 @@ public class IoTDBConnection implements Connection { try { getClient().closeSession(req); } catch (TException e) { - throw new SQLException("Error occurs when closing session at server. Maybe server is down.", e); + throw new SQLException("Error occurs when closing session at server. Maybe server is down.", + e); } finally { isClosed = true; if (transport != null) { @@ -408,7 +415,7 @@ public class IoTDBConnection implements Connection { private void openTransport() throws TTransportException { transport = new TFastFramedTransport(new TSocket(params.getHost(), params.getPort(), - Config.connectionTimeoutInMs)); + Config.connectionTimeoutInMs), params.getInitialBufferCapacity(), params.getMaxFrameSize()); if (!transport.isOpen()) { transport.open(); } @@ -463,10 +470,9 @@ public class IoTDBConnection implements Connection { if (transport != null) { transport.close(); openTransport(); - if(Config.rpcThriftCompressionEnable) { + if (Config.rpcThriftCompressionEnable) { setClient(new TSIService.Client(new TCompactProtocol(transport))); - } - else { + } else { setClient(new TSIService.Client(new TBinaryProtocol(transport))); } openSession(); diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java index dfd1772..1bbb059 100644 --- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java +++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBConnectionParams.java @@ -27,6 +27,9 @@ public class IoTDBConnectionParams { private String username = Config.DEFAULT_USER; private String password = Config.DEFALUT_PASSWORD; + private int initialBufferCapacity = Config.DEFAULT_INITIAL_BUFFER_CAPACITY; + private int maxFrameSize = Config.DEFAULT_MAX_FRAME_SIZE; + public IoTDBConnectionParams(String url) { this.jdbcUriString = url; } @@ -79,4 +82,11 @@ public class IoTDBConnectionParams { this.password = password; } + public int getInitialBufferCapacity() { + return initialBufferCapacity; + } + + public int getMaxFrameSize() { + return maxFrameSize; + } } diff --git a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java index fdee50c..cf8a744 100644 --- a/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java +++ b/server/src/main/java/org/apache/iotdb/db/sync/sender/transfer/SyncClient.java @@ -53,6 +53,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.io.FileUtils; import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.db.concurrent.ThreadName; +import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.storagegroup.TsFileResource; @@ -89,6 +90,8 @@ public class SyncClient implements ISyncClient { private static SyncSenderConfig config = SyncSenderDescriptor.getInstance().getConfig(); + private static final IoTDBConfig ioTDBConfig = IoTDBDescriptor.getInstance().getConfig(); + private static final int BATCH_LINE = 1000; private static final int TIMEOUT_MS = 1000; @@ -233,9 +236,9 @@ public class SyncClient implements ISyncClient { syncSchema(); // 3. Sync all data - String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs(); + String[] dataDirs = ioTDBConfig.getDataDirs(); logger.info("There are {} data dirs to be synced.", dataDirs.length); - for (int i = 0 ; i < dataDirs.length; i++) { + for (int i = 0; i < dataDirs.length; i++) { String dataDir = dataDirs[i]; logger.info("Start to sync data in data dir {}, the process is {}/{}", dataDir, i + 1, dataDirs.length); @@ -274,9 +277,10 @@ public class SyncClient implements ISyncClient { @Override public void establishConnection(String serverIp, int serverPort) throws SyncConnectionException { - transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS)); - TProtocol protocol = null; - if (IoTDBDescriptor.getInstance().getConfig().isRpcThriftCompressionEnable()) { + transport = new TFastFramedTransport(new TSocket(serverIp, serverPort, TIMEOUT_MS), + ioTDBConfig.getThriftInitBufferSize(), ioTDBConfig.getThriftMaxFrameSize()); + TProtocol protocol; + if (ioTDBConfig.isRpcThriftCompressionEnable()) { protocol = new TCompactProtocol(transport); } else { protocol = new TBinaryProtocol(transport); @@ -296,10 +300,9 @@ public class SyncClient implements ISyncClient { public void confirmIdentity() throws SyncConnectionException { try (Socket socket = new Socket(config.getServerIp(), config.getServerPort())) { ConfirmInfo info = new ConfirmInfo(socket.getLocalAddress().getHostAddress(), - getOrCreateUUID(getUuidFile()), - IoTDBDescriptor.getInstance().getConfig().getPartitionInterval(), IoTDBConstant.VERSION); - SyncStatus status = serviceClient - .check(info); + getOrCreateUUID(getUuidFile()), ioTDBConfig.getPartitionInterval(), + IoTDBConstant.VERSION); + SyncStatus status = serviceClient.check(info); if (status.code != SUCCESS_CODE) { throw new SyncConnectionException( "The receiver rejected the synchronization task because " + status.msg); @@ -432,14 +435,14 @@ public class SyncClient implements ISyncClient { if (syncSchemaLogFile.exists()) { try (BufferedReader br = new BufferedReader(new FileReader(syncSchemaLogFile))) { String pos = br.readLine(); - if(pos != null) { + if (pos != null) { return Integer.parseInt(pos); } } } } catch (IOException e) { logger.error("Can not find file {}", syncSchemaLogFile.getAbsoluteFile(), e); - } catch (NumberFormatException e){ + } catch (NumberFormatException e) { logger.error("Sync schema pos is not valid", e); } return 0; @@ -514,8 +517,8 @@ public class SyncClient implements ISyncClient { } @Override - public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId, Set<File> deletedFilesName) - throws IOException { + public void syncDeletedFilesNameInOneGroup(String sgName, Long timeRangeId, + Set<File> deletedFilesName) throws IOException { if (deletedFilesName.isEmpty()) { logger.info("There has no deleted files to be synced in storage group {}", sgName); return; @@ -615,7 +618,7 @@ public class SyncClient implements ISyncClient { ByteBuffer buffToSend = ByteBuffer.wrap(bos.toByteArray()); bos.reset(); SyncStatus status = serviceClient.syncData(buffToSend); - if(status.code == CONFLICT_CODE){ + if (status.code == CONFLICT_CODE) { throw new SyncDeviceOwnerConflictException(status.msg); } if (status.code != SUCCESS_CODE) { @@ -649,7 +652,7 @@ public class SyncClient implements ISyncClient { try (BufferedWriter bw = new BufferedWriter(new FileWriter(currentLocalFile))) { for (Map<Long, Set<File>> currentLocalFiles : lastLocalFilesMap.values()) { for (Set<File> files : currentLocalFiles.values()) { - for(File file: files) { + for (File file : files) { bw.write(file.getAbsolutePath()); bw.newLine(); } @@ -677,23 +680,21 @@ public class SyncClient implements ISyncClient { private File getSchemaPosFile() { - return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(), + return new File(ioTDBConfig.getSyncDir(), config.getSyncReceiverName() + File.separator + SyncConstant.SCHEMA_POS_FILE_NAME); } private File getSchemaLogFile() { - return new File(IoTDBDescriptor.getInstance().getConfig().getSchemaDir(), - MetadataConstant.METADATA_LOG); + return new File(ioTDBConfig.getSchemaDir(), MetadataConstant.METADATA_LOG); } private File getLockFile() { - return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(), + return new File(ioTDBConfig.getSyncDir(), config.getSyncReceiverName() + File.separator + SyncConstant.LOCK_FILE_NAME); } private File getUuidFile() { - return new File(IoTDBDescriptor.getInstance().getConfig().getSyncDir(), - SyncConstant.UUID_FILE_NAME); + return new File(ioTDBConfig.getSyncDir(), SyncConstant.UUID_FILE_NAME); } private static class InstanceHolder { diff --git a/session/src/main/java/org/apache/iotdb/session/Config.java b/session/src/main/java/org/apache/iotdb/session/Config.java index b7cfb24..3d31eab 100644 --- a/session/src/main/java/org/apache/iotdb/session/Config.java +++ b/session/src/main/java/org/apache/iotdb/session/Config.java @@ -26,4 +26,14 @@ public class Config { public static final int DEFAULT_TIMEOUT_MS = 0; public static final int RETRY_NUM = 3; public static final long RETRY_INTERVAL_MS = 1000; + + /** + * thrift init buffer size, 1KB by default + */ + public static final int DEFAULT_INITIAL_BUFFER_CAPACITY = 1024; + + /** + * thrift max frame size (16384000 bytes by default), we change it to 64MB + */ + public static final int DEFAULT_MAX_FRAME_SIZE = 67108864; } diff --git a/session/src/main/java/org/apache/iotdb/session/Session.java b/session/src/main/java/org/apache/iotdb/session/Session.java index ffb65ad..beb9056 100644 --- a/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/session/src/main/java/org/apache/iotdb/session/Session.java @@ -76,6 +76,8 @@ public class Session { private int rpcPort; private String username; private String password; + private int initialBufferCapacity; + private int maxFrameSize; private TSIService.Iface client = null; private long sessionId; private TTransport transport; @@ -87,32 +89,46 @@ public class Session { private int connectionTimeoutInMs; public Session(String host, int rpcPort) { - this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE, null); + this(host, rpcPort, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, Config.DEFAULT_FETCH_SIZE, + null, Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE); } public Session(String host, String rpcPort, String username, String password) { - this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null); + this(host, Integer.parseInt(rpcPort), username, password, Config.DEFAULT_FETCH_SIZE, null, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE); } public Session(String host, int rpcPort, String username, String password) { - this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null); + this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, null, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE); } public Session(String host, int rpcPort, String username, String password, int fetchSize) { - this(host, rpcPort, username, password, fetchSize, null); + this(host, rpcPort, username, password, fetchSize, null, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE); } public Session(String host, int rpcPort, String username, String password, ZoneId zoneId) { - this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId); + this(host, rpcPort, username, password, Config.DEFAULT_FETCH_SIZE, zoneId, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE); } - public Session(String host, int rpcPort, String username, String password, int fetchSize, ZoneId zoneId) { + public Session(String host, int rpcPort, String username, String password, int fetchSize, + ZoneId zoneId) { + this(host, rpcPort, username, password, fetchSize, zoneId, + Config.DEFAULT_INITIAL_BUFFER_CAPACITY, Config.DEFAULT_MAX_FRAME_SIZE); + } + + public Session(String host, int rpcPort, String username, String password, int fetchSize, + ZoneId zoneId, int initialBufferCapacity, int maxFrameSize) { this.host = host; this.rpcPort = rpcPort; this.username = username; this.password = password; this.fetchSize = fetchSize; this.zoneId = zoneId; + this.initialBufferCapacity = initialBufferCapacity; + this.maxFrameSize = maxFrameSize; } public synchronized void open() throws IoTDBConnectionException { @@ -132,7 +148,8 @@ public class Session { this.enableRPCCompression = enableRPCCompression; this.connectionTimeoutInMs = connectionTimeoutInMs; - transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs)); + transport = new TFastFramedTransport(new TSocket(host, rpcPort, connectionTimeoutInMs), + initialBufferCapacity, maxFrameSize); if (!transport.isOpen()) { try { @@ -393,8 +410,9 @@ public class Session { List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList) throws IoTDBConnectionException, StatementExecutionException { - insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false); + insertRecordsOfOneDevice(deviceId, times, measurementsList, typesList, valuesList, false); } + /** * Insert multiple rows, which can reduce the overhead of network. This method is just like jdbc * executeBatch, we pack some insert request in batch and send them to server. If you want improve @@ -402,15 +420,15 @@ public class Session { * <p> * Each row is independent, which could have different deviceId, time, number of measurements * - * @param haveSorted whether the times have been sorted + * @param haveSorted whether the times have been sorted * @see Session#insertTablet(Tablet) */ public void insertRecordsOfOneDevice(String deviceId, List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, StatementExecutionException { - TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times, measurementsList, - typesList, valuesList, haveSorted); + TSInsertRecordsOfOneDeviceReq request = genTSInsertRecordsOfOneDeviceReq(deviceId, times, + measurementsList, typesList, valuesList, haveSorted); try { RpcUtils.verifySuccess(client.insertRecordsOfOneDevice(request)); } catch (TException e) { @@ -427,9 +445,10 @@ public class Session { } } - private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId, List<Long> times, - List<List<String>> measurementsList, List<List<TSDataType>> typesList, - List<List<Object>> valuesList, boolean haveSorted) throws IoTDBConnectionException, BatchExecutionException { + private TSInsertRecordsOfOneDeviceReq genTSInsertRecordsOfOneDeviceReq(String deviceId, + List<Long> times, List<List<String>> measurementsList, List<List<TSDataType>> typesList, + List<List<Object>> valuesList, boolean haveSorted) + throws IoTDBConnectionException, BatchExecutionException { // check params size int len = times.size(); if (len != measurementsList.size() || len != valuesList.size()) { @@ -439,7 +458,8 @@ public class Session { if (haveSorted) { if (!checkSorted(times)) { - throw new BatchExecutionException("Times in InsertOneDeviceRecords are not in ascending order"); + throw new BatchExecutionException( + "Times in InsertOneDeviceRecords are not in ascending order"); } } else { //sort @@ -478,8 +498,8 @@ public class Session { return Arrays.asList(result); } - private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, List<List<TSDataType>> typesList) - throws IoTDBConnectionException { + private List<ByteBuffer> objectValuesListToByteBufferList(List<List<Object>> valuesList, + List<List<TSDataType>> typesList) throws IoTDBConnectionException { List<ByteBuffer> buffersList = new ArrayList<>(); for (int i = 0; i < valuesList.size(); i++) { ByteBuffer buffer = ByteBuffer.allocate(calculateLength(typesList.get(i), valuesList.get(i)));
