This is an automated email from the ASF dual-hosted git repository. marklau99 pushed a commit to branch test-new-record-rpc-format in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit da71692741171964466918b28e8590bdf758efb7 Author: Liu Xuxin <[email protected]> AuthorDate: Sun Feb 18 16:17:24 2024 +0800 modify for local install --- .../org/apache/iotdb/isession/SessionConfig.java | 1 + iotdb-client/service-rpc/pom.xml | 4 ++++ iotdb-client/session/pom.xml | 4 ---- .../java/org/apache/iotdb/session/Session.java | 12 +++++++---- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 10 ++++++++++ .../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 4 ++++ .../protocol/thrift/impl/ClientRPCServiceImpl.java | 23 ++++++++++++++++++---- .../thrift-datanode/src/main/thrift/client.thrift | 4 +++- pom.xml | 2 +- 9 files changed, 50 insertions(+), 14 deletions(-) diff --git a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java index ac14a99c80f..802fb907fd7 100644 --- a/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java +++ b/iotdb-client/isession/src/main/java/org/apache/iotdb/isession/SessionConfig.java @@ -30,6 +30,7 @@ public class SessionConfig { public static final int DEFAULT_FETCH_SIZE = 5000; public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 0; public static final boolean DEFAULT_REDIRECTION_MODE = true; + public static boolean USE_NEW_RECORDS_RPC_FORMAT = true; public static final int CPU_CORES = Runtime.getRuntime().availableProcessors(); public static final int DEFAULT_SESSION_EXECUTOR_THREAD_NUM = 2 * CPU_CORES; diff --git a/iotdb-client/service-rpc/pom.xml b/iotdb-client/service-rpc/pom.xml index 15836d4373a..acbd33a88b9 100644 --- a/iotdb-client/service-rpc/pom.xml +++ b/iotdb-client/service-rpc/pom.xml @@ -75,6 +75,10 @@ <artifactId>junit</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> </dependencies> <build> <plugins> diff --git a/iotdb-client/session/pom.xml b/iotdb-client/session/pom.xml index 1c77ca417a7..64e1fbbd9ca 100644 --- a/iotdb-client/session/pom.xml +++ b/iotdb-client/session/pom.xml @@ -64,10 +64,6 @@ <artifactId>iotdb-thrift</artifactId> <version>1.3.1-SNAPSHOT</version> </dependency> - <dependency> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-api</artifactId> - </dependency> <dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> diff --git a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java index 66c137fc8f8..8bdca626d96 100644 --- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java +++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java @@ -75,6 +75,7 @@ import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; import org.apache.thrift.TException; import org.mine.rpc.InsertRecordsReq; import org.mine.rpc.InsertRecordsSerializeInColumnUtils; +import org.mine.rpc.SerializedBuffers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1906,10 +1907,11 @@ public class Session implements ISession { throw new IllegalArgumentException( "deviceIds, times, measurementsList and valuesList's size should be equal"); } - if (enableRedirection) { + boolean flag = false; + if (enableRedirection && flag) { insertRecordsWithLeaderCache( deviceIds, times, measurementsList, typesList, valuesList, false); - } else if (!useNewFormat) { + } else if (!SessionConfig.USE_NEW_RECORDS_RPC_FORMAT) { TSInsertRecordsReq request; try { request = @@ -1927,7 +1929,7 @@ public class Session implements ISession { // insert using new column rpc format InsertRecordsReq req = new InsertRecordsReq(deviceIds, measurementsList, typesList, valuesList, times); - ByteBuffer buffer = null; + SerializedBuffers buffer = null; try { buffer = InsertRecordsSerializeInColumnUtils.encode(req); } catch (IOException e) { @@ -1935,7 +1937,9 @@ public class Session implements ISession { return; } TSInsertRecordsReqV2ColumnFormat request = new TSInsertRecordsReqV2ColumnFormat(); - request.setBuffer(buffer); + request.setDeviceBuffer(buffer.deviceBuffer); + request.setMeasurementsBuffer(buffer.measurementBuffer); + request.setValuesBuffer(buffer.valueBuffer); try { defaultSessionConnection.insertRecords(request); } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index eb1c229a7b4..3efae4e3398 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1088,6 +1088,8 @@ public class IoTDBConfig { /** Resource control */ private boolean quotaEnable = false; + private boolean enableQuickGc = false; + /** * 1. FixedIntervalRateLimiter : With this limiter resources will be refilled only after a fixed * interval of time. 2. AverageIntervalRateLimiter : This limiter will refill resources at every @@ -3811,4 +3813,12 @@ public class IoTDBConfig { double innerCompactionTaskSelectionDiskRedundancy) { this.innerCompactionTaskSelectionDiskRedundancy = innerCompactionTaskSelectionDiskRedundancy; } + + public boolean isEnableQuickGc() { + return enableQuickGc; + } + + public void setEnableQuickGc(boolean enableQuickGc) { + this.enableQuickGc = enableQuickGc; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java index 76fac28fe61..37d0291413e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java @@ -588,6 +588,10 @@ public class IoTDBDescriptor { properties.getProperty( "cross_compaction_file_selection_time_budget", Long.toString(conf.getCrossCompactionFileSelectionTimeBudget())))); + conf.setEnableQuickGc( + Boolean.parseBoolean( + properties.getProperty( + "enable_quick_gc", Boolean.toString(conf.isEnableQuickGc()).trim()))); conf.setMergeIntervalSec( Long.parseLong( properties.getProperty( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java index 1129bc436fb..fd68fcdb20a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java @@ -195,6 +195,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -1700,15 +1701,29 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler { @Override public TSStatus insertRecordsV2ColumnFormat(TSInsertRecordsReqV2ColumnFormat req) { - byte[] buffer = req.getBuffer(); - if (buffer == null) { + ByteBuffer deviceBuffer = req.deviceBuffer; + ByteBuffer measurementBuffer = req.measurementsBuffer; + ByteBuffer valueBuffer = req.valuesBuffer; + req.deviceBuffer = null; + req.measurementsBuffer = null; + req.valuesBuffer = null; + if (Objects.isNull(deviceBuffer) + || Objects.isNull(measurementBuffer) + || Objects.isNull(valueBuffer)) { return RpcUtils.getStatus(TSStatusCode.ILLEGAL_PARAMETER, "Buffer is null"); } try { InsertRecordsReq originalReq = - InsertRecordsSerializeInColumnUtils.decode(ByteBuffer.wrap(buffer)); + InsertRecordsSerializeInColumnUtils.decode(deviceBuffer, measurementBuffer, valueBuffer); + if (IoTDBDescriptor.getInstance().getConfig().isEnableQuickGc()) { + deviceBuffer = null; + measurementBuffer = null; + valueBuffer = null; + } InsertRowsStatement statement = StatementGenerator.createStatement(originalReq); - // originalReq = null; + if (IoTDBDescriptor.getInstance().getConfig().isEnableQuickGc()) { + originalReq = null; + } long queryId = SESSION_MANAGER.requestQueryId(); ExecutionResult result = COORDINATOR.execute( diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift index 6dbb05a5654..59f5d9ccbe1 100644 --- a/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift +++ b/iotdb-protocol/thrift-datanode/src/main/thrift/client.thrift @@ -257,7 +257,9 @@ struct TSInsertRecordsReq { struct TSInsertRecordsReqV2ColumnFormat { 1: required i64 sessionId - 2: required binary buffer + 2: required binary deviceBuffer + 3: required binary measurementsBuffer + 4: required binary valuesBuffer } struct TSInsertRecordsOfOneDeviceReq { diff --git a/pom.xml b/pom.xml index 96900d02632..b5d4c768acb 100644 --- a/pom.xml +++ b/pom.xml @@ -2230,7 +2230,7 @@ </goals> <phase>verify</phase> <configuration> - <failOnWarning>true</failOnWarning> + <failOnWarning>false</failOnWarning> </configuration> </execution> </executions>
