This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f56f388fbbe Pipe IT: req size automatic fragmentation integration test 
(#13342)
f56f388fbbe is described below

commit f56f388fbbeaaa8a798622d03d4c2eb205910565
Author: Zhenyu Luo <[email protected]>
AuthorDate: Mon Sep 2 18:40:34 2024 +0800

    Pipe IT: req size automatic fragmentation integration test (#13342)
---
 .../it/env/cluster/config/MppCommonConfig.java     |  10 +
 .../env/cluster/config/MppSharedCommonConfig.java  |   8 +
 .../it/env/remote/config/RemoteCommonConfig.java   |   6 +
 .../org/apache/iotdb/itbase/env/CommonConfig.java  |   3 +
 .../pipe/it/autocreate/AbstractPipeDualAutoIT.java |   8 +-
 .../pipe/it/manual/AbstractPipeDualManualIT.java   |   8 +-
 .../pipe/it/manual/IoTDBPipeReqAutoSliceIT.java    | 482 +++++++++++++++++++++
 7 files changed, 519 insertions(+), 6 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
index 741d02d9bba..1760301ac73 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppCommonConfig.java
@@ -474,6 +474,16 @@ public class MppCommonConfig extends MppBaseConfig 
implements CommonConfig {
     return this;
   }
 
+  @Override
+  public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
+      int pipeConnectorRequestSliceThresholdBytes) {
+    setProperty(
+        "pipe_connector_request_slice_threshold_bytes",
+        String.valueOf(pipeConnectorRequestSliceThresholdBytes));
+
+    return this;
+  }
+
   // For part of the log directory
   public String getClusterConfigStr() {
     return 
fromConsensusFullNameToAbbr(properties.getProperty(CONFIG_NODE_CONSENSUS_PROTOCOL_CLASS))
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
index 7278fc0fb9e..969d4bb8d41 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppSharedCommonConfig.java
@@ -483,4 +483,12 @@ public class MppSharedCommonConfig implements CommonConfig 
{
     
cnConfig.setPipeMetaSyncerSyncIntervalMinutes(pipeMetaSyncerSyncIntervalMinutes);
     return this;
   }
+
+  @Override
+  public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
+      int pipeConnectorRequestSliceThresholdBytes) {
+    
dnConfig.setPipeConnectorRequestSliceThresholdBytes(pipeConnectorRequestSliceThresholdBytes);
+    
cnConfig.setPipeConnectorRequestSliceThresholdBytes(pipeConnectorRequestSliceThresholdBytes);
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
index 238a85a3a4c..cee8aadf05f 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteCommonConfig.java
@@ -339,4 +339,10 @@ public class RemoteCommonConfig implements CommonConfig {
   public CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long 
pipeMetaSyncerSyncIntervalMinutes) {
     return this;
   }
+
+  @Override
+  public CommonConfig setPipeConnectorRequestSliceThresholdBytes(
+      int pipeConnectorRequestSliceThresholdBytes) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
index d58c846f39e..d806be8db1d 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/CommonConfig.java
@@ -150,4 +150,7 @@ public interface CommonConfig {
   CommonConfig setPipeMetaSyncerInitialSyncDelayMinutes(long 
pipeMetaSyncerInitialSyncDelayMinutes);
 
   CommonConfig setPipeMetaSyncerSyncIntervalMinutes(long 
pipeMetaSyncerSyncIntervalMinutes);
+
+  CommonConfig setPipeConnectorRequestSliceThresholdBytes(
+      int pipeConnectorRequestSliceThresholdBytes);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
index a0a27791a31..f2d3ec8cf6e 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java
@@ -36,7 +36,12 @@ abstract class AbstractPipeDualAutoIT {
     MultiEnvFactory.createEnv(2);
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
+    setupConfig();
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment();
+  }
 
+  protected void setupConfig() {
     // TODO: delete ratis configurations
     senderEnv
         .getConfig()
@@ -54,9 +59,6 @@ abstract class AbstractPipeDualAutoIT {
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
     receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
   }
 
   @After
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
index ae81f1ffb20..cdca06025fe 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/AbstractPipeDualManualIT.java
@@ -36,7 +36,12 @@ abstract class AbstractPipeDualManualIT {
     MultiEnvFactory.createEnv(2);
     senderEnv = MultiEnvFactory.getEnv(0);
     receiverEnv = MultiEnvFactory.getEnv(1);
+    setupConfig();
+    senderEnv.initClusterEnvironment();
+    receiverEnv.initClusterEnvironment();
+  }
 
+  protected void setupConfig() {
     // TODO: delete ratis configurations
     senderEnv
         .getConfig()
@@ -54,9 +59,6 @@ abstract class AbstractPipeDualManualIT {
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
     receiverEnv.getConfig().getCommonConfig().setCnConnectionTimeoutMs(600000);
-
-    senderEnv.initClusterEnvironment();
-    receiverEnv.initClusterEnvironment();
   }
 
   @After
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeReqAutoSliceIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeReqAutoSliceIT.java
new file mode 100644
index 00000000000..7ff87375905
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/manual/IoTDBPipeReqAutoSliceIT.java
@@ -0,0 +1,482 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.pipe.it.manual;
+
+import org.apache.iotdb.commons.utils.function.CheckedTriConsumer;
+import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.isession.ISession;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT2ManualCreateSchema;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT2ManualCreateSchema.class})
+public class IoTDBPipeReqAutoSliceIT extends AbstractPipeDualManualIT {
+  private static final int generateDataSize = 10;
+
+  @Override
+  protected void setupConfig() {
+    super.setupConfig();
+    
senderEnv.getConfig().getCommonConfig().setPipeConnectorRequestSliceThresholdBytes(4);
+    
receiverEnv.getConfig().getCommonConfig().setPipeConnectorRequestSliceThresholdBytes(4);
+  }
+
+  @Test
+  public void insertTablet() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          senderSession.insertTablet(tablet);
+        },
+        false);
+  }
+
+  @Ignore
+  @Test
+  public void insertTabletReceiveByTsFile() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          senderSession.insertTablet(tablet);
+        },
+        true);
+  }
+
+  @Ignore
+  @Test
+  public void insertAlignedTablet() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          senderSession.insertAlignedTablet(tablet);
+        },
+        false);
+  }
+
+  @Ignore
+  @Test
+  public void insertAlignedTabletReceiveByTsFile() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          senderSession.insertAlignedTablet(tablet);
+        },
+        true);
+  }
+
+  @Ignore
+  @Test
+  public void insertRecordsReceiveByTsFile() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          senderSession.insertRecords(
+              getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+        },
+        true);
+  }
+
+  @Ignore
+  @Test
+  public void insertRecord() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          for (int i = 0; i < values.size(); i++) {
+            senderSession.insertRecord(
+                tablet.getDeviceId(),
+                timestamps.get(i),
+                pair.left.get(i),
+                pair.right.get(i),
+                values.get(i).toArray());
+          }
+        },
+        false);
+  }
+
+  @Ignore
+  @Test
+  public void insertRecordReceiveByTsFile() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          for (int i = 0; i < values.size(); i++) {
+            senderSession.insertRecord(
+                tablet.getDeviceId(),
+                timestamps.get(i),
+                pair.left.get(i),
+                pair.right.get(i),
+                values.get(i).toArray());
+          }
+        },
+        true);
+  }
+
+  @Ignore
+  @Test
+  public void insertAlignedRecord() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          for (int i = 0; i < values.size(); i++) {
+            senderSession.insertAlignedRecord(
+                tablet.getDeviceId(),
+                timestamps.get(i),
+                pair.left.get(i),
+                pair.right.get(i),
+                values.get(i));
+          }
+        },
+        false);
+  }
+
+  @Ignore
+  @Test
+  public void insertAlignedRecordReceiveByTsFile() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          for (int i = 0; i < values.size(); i++) {
+            senderSession.insertAlignedRecord(
+                tablet.getDeviceId(),
+                timestamps.get(i),
+                pair.left.get(i),
+                pair.right.get(i),
+                values.get(i));
+          }
+        },
+        true);
+  }
+
+  @Ignore
+  @Test
+  public void insertRecords() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          senderSession.insertRecords(
+              getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+        },
+        false);
+  }
+
+  @Ignore
+  @Test
+  public void insertAlignedRecords() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          senderSession.insertAlignedRecords(
+              getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+        },
+        false);
+  }
+
+  @Ignore
+  @Test
+  public void insertAlignedRecordsReceiveByTsFile() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<Object>> values = 
generateTabletInsertRecordForTable(tablet);
+          senderSession.insertAlignedRecords(
+              getDeviceID(tablet), timestamps, pair.left, pair.right, values);
+        },
+        true);
+  }
+
+  @Ignore
+  @Test
+  public void insertStringRecordsOfOneDevice() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<String>> values = 
generateTabletInsertStrRecordForTable(tablet);
+          senderSession.insertStringRecordsOfOneDevice(
+              tablet.getDeviceId(), timestamps, pair.left, values);
+        },
+        false);
+  }
+
+  @Ignore
+  @Test
+  public void insertStringRecordsOfOneDeviceReceiveByTsFile() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<String>> values = 
generateTabletInsertStrRecordForTable(tablet);
+          senderSession.insertStringRecordsOfOneDevice(
+              tablet.getDeviceId(), timestamps, pair.left, values);
+        },
+        true);
+  }
+
+  @Ignore
+  @Test
+  public void insertAlignedStringRecordsOfOneDevice() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<String>> values = 
generateTabletInsertStrRecordForTable(tablet);
+          senderSession.insertAlignedStringRecordsOfOneDevice(
+              tablet.getDeviceId(), timestamps, pair.left, values);
+        },
+        false);
+  }
+
+  @Ignore
+  @Test
+  public void insertAlignedStringRecordsOfOneDeviceReceiveByTsFile() {
+    prepareReqAutoSliceTest(
+        (ISession senderSession, ISession receiverSession, Tablet tablet) -> {
+          List<Long> timestamps = getTimestampList(tablet);
+          Pair<List<List<String>>, List<List<TSDataType>>> pair =
+              getMeasurementSchemasAndType(tablet);
+          List<List<String>> values = 
generateTabletInsertStrRecordForTable(tablet);
+          senderSession.insertAlignedStringRecordsOfOneDevice(
+              tablet.getDeviceId(), timestamps, pair.left, values);
+        },
+        true);
+  }
+
+  private void prepareReqAutoSliceTest(
+      CheckedTriConsumer<ISession, ISession, Tablet, Exception> consumer, 
boolean isTsFile) {
+    Tablet tablet = createTablet();
+    createTimeSeries();
+    try (ISession senderSession = senderEnv.getSessionConnection();
+        ISession receiverSession = receiverEnv.getSessionConnection()) {
+      if (isTsFile) {
+        consumer.accept(senderSession, receiverSession, tablet);
+        senderSession.executeNonQueryStatement("flush");
+        Thread.sleep(2000);
+        createPipe(senderSession, true);
+      } else {
+        createPipe(senderSession, false);
+        Thread.sleep(2000);
+        consumer.accept(senderSession, receiverSession, tablet);
+        senderSession.executeNonQueryStatement("flush");
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail();
+    }
+    verify(tablet);
+  }
+
+  private void createPipe(ISession session, boolean isTsFile)
+      throws IoTDBConnectionException, StatementExecutionException {
+    session.executeNonQueryStatement(
+        String.format(
+            "create pipe test"
+                + " with source 
('source'='iotdb-source','source.path'='root.test.**')"
+                + " with sink 
('sink'='iotdb-thrift-sync-sink','node-urls'='%s:%s','batch.enable'='false','sink.format'='%s')",
+            receiverEnv.getIP(), receiverEnv.getPort(), isTsFile ? "tsfile" : 
"tablet"));
+  }
+
+  private int[] createTestDataForInt32() {
+    int[] data = new int[generateDataSize];
+    Random random = new Random();
+    for (int i = 0; i < generateDataSize; i++) {
+      data[i] = random.nextInt();
+    }
+    return data;
+  }
+
+  private long[] createTestDataForInt64() {
+    long[] data = new long[generateDataSize];
+    long time = System.currentTimeMillis();
+    for (int i = 0; i < generateDataSize; i++) {
+      data[i] = time + i;
+    }
+    return data;
+  }
+
+  private void verify(Tablet tablet) {
+    HashSet<String> set = new HashSet<>();
+    for (int i = 0; i < generateDataSize; i++) {
+      set.add(
+          String.format(
+              "%d,%d,%d,",
+              tablet.timestamps[i], ((int[]) tablet.values[0])[i], ((int[]) 
tablet.values[1])[i]));
+    }
+    TestUtils.assertDataEventuallyOnEnv(
+        receiverEnv,
+        "select * from root.test.** ORDER BY time ASC",
+        "Time,root.test.db.temperature,root.test.db.status,",
+        set,
+        20);
+  }
+
+  private void createTimeSeries() {
+    List<String> timeSeriesCreation =
+        Arrays.asList(
+            "create timeseries root.test.db.status with 
datatype=INT32,encoding=PLAIN",
+            "create timeseries root.test.db.temperature with 
datatype=INT32,encoding=PLAIN");
+    TestUtils.tryExecuteNonQueriesWithRetry(senderEnv, timeSeriesCreation);
+    TestUtils.tryExecuteNonQueriesWithRetry(receiverEnv, timeSeriesCreation);
+  }
+
+  private Tablet createTablet() {
+    long[] timestamp = createTestDataForInt64();
+    int[] temperature = createTestDataForInt32();
+    int[] status = createTestDataForInt32();
+
+    Object[] objects = new Object[2];
+    objects[0] = temperature;
+    objects[1] = status;
+
+    List<IMeasurementSchema> measurementSchemas = new ArrayList<>(2);
+    measurementSchemas.add(new MeasurementSchema("temperature", 
TSDataType.INT32));
+    measurementSchemas.add(new MeasurementSchema("status", TSDataType.INT32));
+
+    BitMap[] bitMaps = new BitMap[2];
+    for (int i = 0; i < bitMaps.length; i++) {
+      bitMaps[i] = new BitMap(generateDataSize);
+    }
+
+    return new Tablet(
+        "root.test.db", measurementSchemas, timestamp, objects, bitMaps, 
generateDataSize);
+  }
+
+  private List<Long> getTimestampList(Tablet tablet) {
+    long[] timestamps = tablet.timestamps;
+    List<Long> data = new ArrayList<>(timestamps.length);
+    for (long timestamp : timestamps) {
+      data.add(timestamp);
+    }
+    return data;
+  }
+
+  private Pair<List<List<String>>, List<List<TSDataType>>> 
getMeasurementSchemasAndType(
+      Tablet tablet) {
+    List<List<String>> schemaData = new ArrayList<>(tablet.rowSize);
+    List<List<TSDataType>> typeData = new ArrayList<>(tablet.rowSize);
+    List<String> measurementSchemas = new 
ArrayList<>(tablet.getSchemas().size());
+    List<TSDataType> types = new ArrayList<>(tablet.rowSize);
+    for (IMeasurementSchema measurementSchema : tablet.getSchemas()) {
+      measurementSchemas.add(measurementSchema.getMeasurementId());
+      types.add(measurementSchema.getType());
+    }
+
+    for (int i = 0; i < tablet.rowSize; i++) {
+      schemaData.add(measurementSchemas);
+      typeData.add(types);
+    }
+
+    return new Pair<>(schemaData, typeData);
+  }
+
+  private List<String> getDeviceID(Tablet tablet) {
+    List<String> data = new ArrayList<>(tablet.rowSize);
+    for (int i = 0; i < tablet.rowSize; i++) {
+      data.add(tablet.getDeviceId());
+    }
+    return data;
+  }
+
+  private List<List<Object>> generateTabletInsertRecordForTable(final Tablet 
tablet) {
+    List<List<Object>> insertRecords = new ArrayList<>(tablet.rowSize);
+    final List<IMeasurementSchema> schemas = tablet.getSchemas();
+    final Object[] values = tablet.values;
+    for (int i = 0; i < tablet.rowSize; i++) {
+      List<Object> insertRecord = new ArrayList<>();
+      for (int j = 0; j < schemas.size(); j++) {
+        switch (schemas.get(j).getType()) {
+          case INT64:
+          case TIMESTAMP:
+            insertRecord.add(((long[]) values[j])[i]);
+            break;
+          case INT32:
+            insertRecord.add(((int[]) values[j])[i]);
+            break;
+        }
+      }
+      insertRecords.add(insertRecord);
+    }
+
+    return insertRecords;
+  }
+
+  private List<List<String>> generateTabletInsertStrRecordForTable(Tablet 
tablet) {
+    List<List<String>> insertRecords = new ArrayList<>(tablet.rowSize);
+    final List<IMeasurementSchema> schemas = tablet.getSchemas();
+    final Object[] values = tablet.values;
+    for (int i = 0; i < tablet.rowSize; i++) {
+      List<String> insertRecord = new ArrayList<>();
+      for (int j = 0; j < schemas.size(); j++) {
+        switch (schemas.get(j).getType()) {
+          case INT64:
+            insertRecord.add(String.valueOf(((long[]) values[j])[i]));
+            break;
+          case INT32:
+            insertRecord.add(String.valueOf(((int[]) values[j])[i]));
+            break;
+        }
+      }
+      insertRecords.add(insertRecord);
+    }
+
+    return insertRecords;
+  }
+}

Reply via email to