This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch yym_rpc_compress
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit b87fd7cf578e694508027bdfa821af08333fd9a1
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Jul 3 12:30:27 2025 +0800

    add test
---
 .../iotdb/session/it/IoTDBSessionCompressedIT.java | 322 +++++++++++++++++++++
 .../java/org/apache/iotdb/session/Session.java     |   2 +-
 .../iotdb/session/rpccompress/TabletEncoder.java   |  13 +-
 3 files changed, 331 insertions(+), 6 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionCompressedIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionCompressedIT.java
new file mode 100644
index 00000000000..23a3ac0e5a1
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/session/it/IoTDBSessionCompressedIT.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.session.it;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.TableSessionBuilder;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.RowRecord;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class IoTDBSessionCompressedIT {
+
+  private static ITableSession session1;
+  private static ITableSession session2;
+  private static ITableSession session3;
+  private static ITableSession session4;
+
+  @BeforeClass
+  public static void setUpClass() throws IoTDBConnectionException {
+    EnvFactory.getEnv().initClusterEnvironment();
+
+    List<String> nodeUrls =
+        EnvFactory.getEnv().getDataNodeWrapperList().stream()
+            .map(AbstractNodeWrapper::getIpAndPortString)
+            .collect(Collectors.toList());
+    session1 =
+        new TableSessionBuilder()
+            .nodeUrls(nodeUrls)
+            
.username(CommonDescriptor.getInstance().getConfig().getAdminName())
+            
.password(CommonDescriptor.getInstance().getConfig().getAdminPassword())
+            .enableCompression(false)
+            .enableRedirection(true)
+            .enableAutoFetch(false)
+            .isCompressed(true)
+            .withCompressionType(CompressionType.SNAPPY)
+            .withBooleanEncoding(TSEncoding.PLAIN)
+            .withInt32Encoding(TSEncoding.CHIMP)
+            .withInt64Encoding(TSEncoding.CHIMP)
+            .withFloatEncoding(TSEncoding.CHIMP)
+            .withDoubleEncoding(TSEncoding.CHIMP)
+            .withBlobEncoding(TSEncoding.PLAIN)
+            .withStringEncoding(TSEncoding.PLAIN)
+            .withTextEncoding(TSEncoding.PLAIN)
+            .withDateEncoding(TSEncoding.PLAIN)
+            .withTimeStampEncoding(TSEncoding.PLAIN)
+            .build();
+    session2 =
+        new TableSessionBuilder()
+            .nodeUrls(nodeUrls)
+            
.username(CommonDescriptor.getInstance().getConfig().getAdminName())
+            
.password(CommonDescriptor.getInstance().getConfig().getAdminPassword())
+            .enableCompression(false)
+            .enableRedirection(true)
+            .enableAutoFetch(false)
+            .isCompressed(true)
+            .withCompressionType(CompressionType.SNAPPY)
+            .withBooleanEncoding(TSEncoding.PLAIN)
+            .withInt32Encoding(TSEncoding.SPRINTZ)
+            .withInt64Encoding(TSEncoding.SPRINTZ)
+            .withFloatEncoding(TSEncoding.RLBE)
+            .withDoubleEncoding(TSEncoding.RLBE)
+            .withBlobEncoding(TSEncoding.PLAIN)
+            .withStringEncoding(TSEncoding.PLAIN)
+            .withTextEncoding(TSEncoding.PLAIN)
+            .withDateEncoding(TSEncoding.PLAIN)
+            .withTimeStampEncoding(TSEncoding.SPRINTZ)
+            .build();
+    session3 =
+        new TableSessionBuilder()
+            .nodeUrls(nodeUrls)
+            
.username(CommonDescriptor.getInstance().getConfig().getAdminName())
+            
.password(CommonDescriptor.getInstance().getConfig().getAdminPassword())
+            .enableCompression(false)
+            .enableRedirection(true)
+            .enableAutoFetch(false)
+            .isCompressed(true)
+            .withCompressionType(CompressionType.GZIP)
+            .withBooleanEncoding(TSEncoding.RLE)
+            .withInt32Encoding(TSEncoding.TS_2DIFF)
+            .withInt64Encoding(TSEncoding.RLE)
+            .withFloatEncoding(TSEncoding.TS_2DIFF)
+            .withDoubleEncoding(TSEncoding.RLE)
+            .withBlobEncoding(TSEncoding.PLAIN)
+            .withStringEncoding(TSEncoding.PLAIN)
+            .withTextEncoding(TSEncoding.PLAIN)
+            .withDateEncoding(TSEncoding.RLE)
+            .withTimeStampEncoding(TSEncoding.RLE)
+            .build();
+    session4 =
+        new TableSessionBuilder()
+            .nodeUrls(nodeUrls)
+            
.username(CommonDescriptor.getInstance().getConfig().getAdminName())
+            
.password(CommonDescriptor.getInstance().getConfig().getAdminPassword())
+            .enableCompression(false)
+            .enableRedirection(true)
+            .enableAutoFetch(false)
+            .isCompressed(true)
+            .withCompressionType(CompressionType.LZMA2)
+            .withBooleanEncoding(TSEncoding.PLAIN)
+            .withInt32Encoding(TSEncoding.GORILLA)
+            .withInt64Encoding(TSEncoding.ZIGZAG)
+            .withFloatEncoding(TSEncoding.GORILLA)
+            .withDoubleEncoding(TSEncoding.GORILLA)
+            .withBlobEncoding(TSEncoding.PLAIN)
+            .withStringEncoding(TSEncoding.PLAIN)
+            .withTextEncoding(TSEncoding.PLAIN)
+            .withDateEncoding(TSEncoding.RLE)
+            .withTimeStampEncoding(TSEncoding.ZIGZAG)
+            .build();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws IoTDBConnectionException {
+    if (session1 != null) {
+      session1.close();
+    }
+    if (session2 != null) {
+      session2.close();
+    }
+    if (session3 != null) {
+      session3.close();
+    }
+    if (session4 != null) {
+      session4.close();
+    }
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  @Test
+  public void testRpcCompressed() throws IoTDBConnectionException, 
StatementExecutionException {
+    List<IMeasurementSchema> schemas = new ArrayList<>();
+    MeasurementSchema schema = new MeasurementSchema();
+    schema.setMeasurementName("pressure0");
+    schema.setDataType(TSDataType.INT32);
+    schema.setCompressionType(CompressionType.SNAPPY);
+    schema.setEncoding(TSEncoding.PLAIN);
+    schemas.add(schema);
+    schema = new MeasurementSchema();
+    schema.setMeasurementName("pressure1");
+    schema.setDataType(TSDataType.INT64);
+    schema.setCompressionType(CompressionType.SNAPPY);
+    schema.setEncoding(TSEncoding.PLAIN);
+    schemas.add(schema);
+    schema = new MeasurementSchema();
+    schema.setMeasurementName("pressure2");
+    schema.setDataType(TSDataType.FLOAT);
+    schema.setCompressionType(CompressionType.SNAPPY);
+    schema.setEncoding(TSEncoding.PLAIN);
+    schemas.add(schema);
+    schema = new MeasurementSchema();
+    schema.setMeasurementName("pressure3");
+    schema.setDataType(TSDataType.DOUBLE);
+    schema.setCompressionType(CompressionType.SNAPPY);
+    schema.setEncoding(TSEncoding.PLAIN);
+    schemas.add(schema);
+    schema = new MeasurementSchema();
+    schema.setMeasurementName("pressure4");
+    schema.setDataType(TSDataType.TEXT);
+    schema.setCompressionType(CompressionType.SNAPPY);
+    schema.setEncoding(TSEncoding.PLAIN);
+    schemas.add(schema);
+    schema = new MeasurementSchema();
+    schema.setMeasurementName("pressure5");
+    schema.setDataType(TSDataType.BOOLEAN);
+    schema.setCompressionType(CompressionType.SNAPPY);
+    schema.setEncoding(TSEncoding.PLAIN);
+    schemas.add(schema);
+    schema = new MeasurementSchema();
+    schema.setMeasurementName("pressure6");
+    schema.setDataType(TSDataType.STRING);
+    schema.setCompressionType(CompressionType.SNAPPY);
+    schema.setEncoding(TSEncoding.PLAIN);
+    schemas.add(schema);
+    schema = new MeasurementSchema();
+    schema.setMeasurementName("pressure7");
+    schema.setDataType(TSDataType.BLOB);
+    schema.setCompressionType(CompressionType.SNAPPY);
+    schema.setEncoding(TSEncoding.PLAIN);
+    schemas.add(schema);
+
+    long[] timestamp = new long[] {3L, 4L, 5L, 6L};
+    Object[] values = new Object[8];
+    values[0] = new int[] {1, 2, 8, 15};
+    values[1] = new long[] {1L, 2L, 8L, 15L};
+    values[2] = new float[] {1.1f, 1.2f, 8.8f, 15.5f};
+    values[3] = new double[] {0.707, 0.708, 8.8, 15.5};
+    values[4] =
+        new Binary[] {
+          new Binary(new byte[] {(byte) 32}),
+          new Binary(new byte[] {(byte) 16}),
+          new Binary(new byte[] {(byte) 1}),
+          new Binary(new byte[] {(byte) 56})
+        };
+    values[5] = new boolean[] {true, false, true, false};
+    values[6] =
+        new Binary[] {
+          new Binary(new byte[] {(byte) 32}),
+          new Binary(new byte[] {(byte) 16}),
+          new Binary(new byte[] {(byte) 1}),
+          new Binary(new byte[] {(byte) 56})
+        };
+    values[7] =
+        new Binary[] {
+          new Binary(new byte[] {(byte) 32}),
+          new Binary(new byte[] {(byte) 16}),
+          new Binary(new byte[] {(byte) 1}),
+          new Binary(new byte[] {(byte) 56})
+        };
+    BitMap[] partBitMap = new BitMap[8];
+
+    String tableName = "table_13";
+    Tablet tablet = new Tablet(tableName, schemas, timestamp, values, 
partBitMap, 4);
+
+    session1.executeNonQueryStatement("create database IF NOT EXISTS 
dbTest_0");
+    session1.executeNonQueryStatement("use dbTest_0");
+    session2.executeNonQueryStatement("use dbTest_0");
+    session3.executeNonQueryStatement("use dbTest_0");
+    session4.executeNonQueryStatement("use dbTest_0");
+
+    // 1. insert
+    session1.insert(tablet);
+    session2.insert(tablet);
+    session3.insert(tablet);
+    session4.insert(tablet);
+
+    // 2. assert
+    SessionDataSet sessionDataSet1 =
+        session1.executeQueryStatement("select * from dbTest_0." + tableName);
+    SessionDataSet sessionDataSet2 =
+        session2.executeQueryStatement("select * from dbTest_0." + tableName);
+    SessionDataSet sessionDataSet3 =
+        session3.executeQueryStatement("select * from dbTest_0." + tableName);
+    SessionDataSet sessionDataSet4 =
+        session4.executeQueryStatement("select * from dbTest_0." + tableName);
+
+    if (sessionDataSet1.hasNext()) {
+      RowRecord next = sessionDataSet1.next();
+      Assert.assertEquals(3L, next.getFields().get(0).getLongV());
+      Assert.assertEquals(1, next.getFields().get(1).getIntV());
+      Assert.assertEquals(1L, next.getFields().get(2).getLongV());
+      Assert.assertEquals(1.1f, next.getFields().get(3).getFloatV(), 0.01);
+      Assert.assertEquals(0.707, next.getFields().get(4).getDoubleV(), 0.01);
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(5).getBinaryV());
+      Assert.assertEquals(true, next.getFields().get(6).getBoolV());
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(7).getBinaryV());
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(8).getBinaryV());
+    }
+    if (sessionDataSet2.hasNext()) {
+      RowRecord next = sessionDataSet2.next();
+      Assert.assertEquals(3L, next.getFields().get(0).getLongV());
+      Assert.assertEquals(1, next.getFields().get(1).getIntV());
+      Assert.assertEquals(1L, next.getFields().get(2).getLongV());
+      Assert.assertEquals(1.1f, next.getFields().get(3).getFloatV(), 0.01);
+      Assert.assertEquals(0.707, next.getFields().get(4).getDoubleV(), 0.01);
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(5).getBinaryV());
+      Assert.assertEquals(true, next.getFields().get(6).getBoolV());
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(7).getBinaryV());
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(8).getBinaryV());
+    }
+    if (sessionDataSet3.hasNext()) {
+      RowRecord next = sessionDataSet3.next();
+      Assert.assertEquals(3L, next.getFields().get(0).getLongV());
+      Assert.assertEquals(1, next.getFields().get(1).getIntV());
+      Assert.assertEquals(1L, next.getFields().get(2).getLongV());
+      Assert.assertEquals(1.1f, next.getFields().get(3).getFloatV(), 0.01);
+      Assert.assertEquals(0.707, next.getFields().get(4).getDoubleV(), 0.01);
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(5).getBinaryV());
+      Assert.assertEquals(true, next.getFields().get(6).getBoolV());
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(7).getBinaryV());
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(8).getBinaryV());
+    }
+    if (sessionDataSet4.hasNext()) {
+      RowRecord next = sessionDataSet4.next();
+      Assert.assertEquals(3L, next.getFields().get(0).getLongV());
+      Assert.assertEquals(1, next.getFields().get(1).getIntV());
+      Assert.assertEquals(1L, next.getFields().get(2).getLongV());
+      Assert.assertEquals(1.1f, next.getFields().get(3).getFloatV(), 0.01);
+      Assert.assertEquals(0.707, next.getFields().get(4).getDoubleV(), 0.01);
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(5).getBinaryV());
+      Assert.assertEquals(true, next.getFields().get(6).getBoolV());
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(7).getBinaryV());
+      Assert.assertEquals(new Binary(new byte[] {(byte) 32}), 
next.getFields().get(8).getBinaryV());
+    }
+  }
+}
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
index 2340003ae1d..647bc76e466 100644
--- a/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
+++ b/iotdb-client/session/src/main/java/org/apache/iotdb/session/Session.java
@@ -3015,7 +3015,7 @@ public class Session implements ISession {
 
     TabletEncoder encoder =
         new TabletEncoder(
-            this.compressionType,
+            enableRPCCompression ? this.compressionType : 
CompressionType.UNCOMPRESSED,
             
encodingTypes.stream().map(TSEncoding::deserialize).collect(Collectors.toList()));
 
     request.setIsCompressed(this.enableRPCCompression);
diff --git 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/rpccompress/TabletEncoder.java
 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/rpccompress/TabletEncoder.java
index 3c8a97416e4..7ad02a53bbc 100644
--- 
a/iotdb-client/session/src/main/java/org/apache/iotdb/session/rpccompress/TabletEncoder.java
+++ 
b/iotdb-client/session/src/main/java/org/apache/iotdb/session/rpccompress/TabletEncoder.java
@@ -100,12 +100,15 @@ public class TabletEncoder {
   private ByteBuffer compressBuffer(ByteBuffer buffer) {
     if (compressionType != CompressionType.UNCOMPRESSED) {
       ICompressor compressor = ICompressor.getCompressor(compressionType);
-      ByteBuffer compressed =
-          
ByteBuffer.allocate(compressor.getMaxBytesForCompression(buffer.remaining()));
+      byte[] compressed = new 
byte[compressor.getMaxBytesForCompression(buffer.remaining())];
       try {
-        compressor.compress(buffer, compressed);
-        buffer = compressed;
-        buffer.flip();
+        int compressedLength =
+            compressor.compress(
+                buffer.array(),
+                buffer.arrayOffset() + buffer.position(),
+                buffer.remaining(),
+                compressed);
+        buffer = ByteBuffer.wrap(compressed, 0, compressedLength);
       } catch (IOException e) {
         throw new IllegalStateException(e);
       }

Reply via email to