This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch tsfile-batch-ordering
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 4aaa30745001fa633b7034ed78974365bb877d34
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 26 11:36:44 2024 +0800

    Pipe: Avoid writing out-of-order data in tsfile when `'sink.format' = 
'file'`
---
 .../evolvable/batch/PipeTabletEventSorter.java     | 196 +++++++++++++++++++
 .../batch/PipeTabletEventTsFileBatch.java          | 162 +++++++++++-----
 .../pipe/connector/PipeTabletEventSorterTest.java  | 215 +++++++++++++++++++++
 3 files changed, 521 insertions(+), 52 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventSorter.java
new file mode 100644
index 00000000000..ca24972d798
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventSorter.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class PipeTabletEventSorter {
+
+  private final Tablet tablet;
+
+  private boolean isSorted = true;
+  private boolean isDeduplicated = true;
+
+  private Integer[] index;
+  private int deduplicatedSize;
+
+  public PipeTabletEventSorter(final Tablet tablet) {
+    this.tablet = tablet;
+    deduplicatedSize = tablet == null ? 0 : tablet.rowSize;
+  }
+
+  public void deduplicateAndSortTimestampsIfNecessary() {
+    if (tablet == null || tablet.rowSize == 0) {
+      return;
+    }
+
+    for (int i = 1, size = tablet.rowSize; i < size; ++i) {
+      final long currentTimestamp = tablet.timestamps[i];
+      final long previousTimestamp = tablet.timestamps[i - 1];
+
+      if (currentTimestamp < previousTimestamp) {
+        isSorted = false;
+      }
+      if (currentTimestamp == previousTimestamp) {
+        isDeduplicated = false;
+      }
+
+      if (!isSorted && !isDeduplicated) {
+        break;
+      }
+    }
+
+    if (isSorted && isDeduplicated) {
+      return;
+    }
+
+    index = new Integer[tablet.rowSize];
+    for (int i = 0, size = tablet.rowSize; i < size; i++) {
+      index[i] = i;
+    }
+
+    if (!isSorted) {
+      sortTimestamps();
+    }
+    if (!isDeduplicated) {
+      deduplicateTimestamps();
+    }
+    sortAndDeduplicateValuesAndBitMaps();
+  }
+
+  private void sortTimestamps() {
+    Arrays.sort(index, Comparator.comparingLong(i -> tablet.timestamps[i]));
+    Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
+  }
+
+  private void deduplicateTimestamps() {
+    deduplicatedSize = 1;
+    for (int i = 1, size = tablet.rowSize; i < size; i++) {
+      if (tablet.timestamps[i] != tablet.timestamps[i - 1]) {
+        index[deduplicatedSize] = index[i];
+        tablet.timestamps[deduplicatedSize] = tablet.timestamps[i];
+
+        ++deduplicatedSize;
+      }
+    }
+    tablet.rowSize = deduplicatedSize;
+  }
+
+  private void sortAndDeduplicateValuesAndBitMaps() {
+    int columnIndex = 0;
+    for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
+      final IMeasurementSchema schema = tablet.getSchemas().get(i);
+      if (schema != null) {
+        tablet.values[columnIndex] =
+            deduplicateValueList(
+                deduplicatedSize, tablet.values[columnIndex], 
schema.getType(), index);
+        if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
+          tablet.bitMaps[columnIndex] =
+              deduplicateBitMap(deduplicatedSize, tablet.bitMaps[columnIndex], 
index);
+        }
+        columnIndex++;
+      }
+    }
+  }
+
+  private static Object deduplicateValueList(
+      int deduplicatedSize,
+      final Object valueList,
+      final TSDataType dataType,
+      final Integer[] index) {
+    switch (dataType) {
+      case BOOLEAN:
+        final boolean[] boolValues = (boolean[]) valueList;
+        final boolean[] deduplicatedBoolValues = new 
boolean[boolValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedBoolValues[i] = boolValues[index[i]];
+        }
+        return deduplicatedBoolValues;
+      case INT32:
+        final int[] intValues = (int[]) valueList;
+        final int[] deduplicatedIntValues = new int[intValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedIntValues[i] = intValues[index[i]];
+        }
+        return deduplicatedIntValues;
+      case DATE:
+        final LocalDate[] dateValues = (LocalDate[]) valueList;
+        final LocalDate[] deduplicatedDateValues = new 
LocalDate[dateValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedDateValues[i] = dateValues[index[i]];
+        }
+        return deduplicatedDateValues;
+      case INT64:
+      case TIMESTAMP:
+        final long[] longValues = (long[]) valueList;
+        final long[] deduplicatedLongValues = new long[longValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedLongValues[i] = longValues[index[i]];
+        }
+        return deduplicatedLongValues;
+      case FLOAT:
+        final float[] floatValues = (float[]) valueList;
+        final float[] deduplicatedFloatValues = new float[floatValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedFloatValues[i] = floatValues[index[i]];
+        }
+        return deduplicatedFloatValues;
+      case DOUBLE:
+        final double[] doubleValues = (double[]) valueList;
+        final double[] deduplicatedDoubleValues = new 
double[doubleValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedDoubleValues[i] = doubleValues[index[i]];
+        }
+        return deduplicatedDoubleValues;
+      case TEXT:
+      case BLOB:
+      case STRING:
+        final Binary[] binaryValues = (Binary[]) valueList;
+        final Binary[] deduplicatedBinaryValues = new 
Binary[binaryValues.length];
+        for (int i = 0; i < deduplicatedSize; i++) {
+          deduplicatedBinaryValues[i] = binaryValues[index[i]];
+        }
+        return deduplicatedBinaryValues;
+      default:
+        throw new UnSupportedDataTypeException(
+            String.format("Data type %s is not supported.", dataType));
+    }
+  }
+
+  private static BitMap deduplicateBitMap(
+      int deduplicatedSize, final BitMap bitMap, final Integer[] index) {
+    final BitMap deduplicatedBitMap = new BitMap(bitMap.getSize());
+    for (int i = 0; i < deduplicatedSize; i++) {
+      if (bitMap.isMarked(index[i])) {
+        deduplicatedBitMap.mark(i);
+      }
+    }
+    return deduplicatedBitMap;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 6abca7b3183..0015db2e192 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -48,6 +48,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -165,7 +166,12 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
   }
 
   private void bufferTablet(
-      final String pipeName, long creationTime, Tablet tablet, boolean 
isAligned) {
+      final String pipeName,
+      final long creationTime,
+      final Tablet tablet,
+      final boolean isAligned) {
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
     totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
 
     pipeName2WeightMap.compute(
@@ -208,12 +214,19 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
           Comparator.comparingLong(tablet -> tablet.timestamps[0]));
     }
 
+    // Sort the devices by device id
+    List<String> devices = new ArrayList<>(device2Tablets.keySet());
+    devices.sort(Comparator.naturalOrder());
+
     // Replace ArrayList with LinkedList to improve performance
-    final Map<String, LinkedList<Tablet>> device2TabletsLinkedList = new 
HashMap<>();
-    for (final Map.Entry<String, List<Tablet>> entry : 
device2Tablets.entrySet()) {
-      device2TabletsLinkedList.put(entry.getKey(), new 
LinkedList<>(entry.getValue()));
+    final LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList =
+        new LinkedHashMap<>();
+    for (final String device : devices) {
+      device2TabletsLinkedList.put(device, new 
LinkedList<>(device2Tablets.get(device)));
     }
-    // Clear the original device2Tablets to release memory
+
+    // Help GC
+    devices.clear();
     device2Tablets.clear();
 
     // Write the tablets to the tsfile device by device, and the tablets
@@ -241,56 +254,44 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
                         + TsFileConstant.TSFILE_SUFFIX));
       }
 
-      final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
-          device2TabletsLinkedList.entrySet().iterator();
-
-      while (iterator.hasNext()) {
-        final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next();
-        final String deviceId = entry.getKey();
-        final LinkedList<Tablet> tablets = entry.getValue();
-
-        final List<Tablet> tabletsToWrite = new ArrayList<>();
-
-        Tablet lastTablet = null;
-        while (!tablets.isEmpty()) {
-          final Tablet tablet = tablets.peekFirst();
-          if (Objects.isNull(lastTablet)
-              // lastTablet.rowSize is not 0
-              || lastTablet.timestamps[lastTablet.rowSize - 1] < 
tablet.timestamps[0]) {
-            tabletsToWrite.add(tablet);
-            lastTablet = tablet;
-            tablets.pollFirst();
-          } else {
-            break;
-          }
-        }
+      try {
+        tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList, 
device2Aligned);
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Batch id = {}: Failed to write tablets into tsfile, because {}",
+            currentBatchId.get(),
+            e.getMessage(),
+            e);
 
-        if (tablets.isEmpty()) {
-          iterator.remove();
+        try {
+          fileWriter.close();
+
+          final File sealedFile = fileWriter.getIOWriter().getFile();
+          sealedFiles.add(sealedFile);
+          LOGGER.info(
+              "Batch id = {}: Seal tsfile {} successfully after failed to 
write tablets into it. ",
+              currentBatchId.get(),
+              sealedFile.getPath());
+
+          fileWriter = null;
+        } catch (final Exception sealException) {
+          LOGGER.warn(
+              "Batch id = {}: Failed to seal tsfile after failed to write 
tablets into it. ",
+              currentBatchId.get(),
+              sealException);
+
+          final boolean deleteSuccess = 
FileUtils.deleteQuietly(fileWriter.getIOWriter().getFile());
+          LOGGER.warn(
+              "Batch id = {}: Delete the tsfile {} after failed to seal it {}. 
"
+                  + "Maybe the tsfile needs to be deleted manually.",
+              currentBatchId.get(),
+              fileWriter.getIOWriter().getFile().getPath(),
+              deleteSuccess ? "successfully" : "unsuccessfully");
+
+          fileWriter = null;
         }
 
-        final boolean isAligned = device2Aligned.get(deviceId);
-        for (final Tablet tablet : tabletsToWrite) {
-          if (isAligned) {
-            try {
-              fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId), 
tablet.getSchemas());
-            } catch (final WriteProcessException ignore) {
-              // Do nothing if the timeSeries has been registered
-            }
-
-            fileWriter.writeAligned(tablet);
-          } else {
-            for (final MeasurementSchema schema : tablet.getSchemas()) {
-              try {
-                fileWriter.registerTimeseries(new Path(tablet.deviceId), 
schema);
-              } catch (final WriteProcessException ignore) {
-                // Do nothing if the timeSeries has been registered
-              }
-            }
-
-            fileWriter.write(tablet);
-          }
-        }
+        throw e;
       }
 
       fileWriter.close();
@@ -308,6 +309,63 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     return sealedFiles;
   }
 
+  private void tryBestToWriteTabletsIntoOneFile(
+      LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList,
+      Map<String, Boolean> device2Aligned)
+      throws IOException, WriteProcessException {
+    final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
+        device2TabletsLinkedList.entrySet().iterator();
+
+    while (iterator.hasNext()) {
+      final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next();
+      final String deviceId = entry.getKey();
+      final LinkedList<Tablet> tablets = entry.getValue();
+
+      final List<Tablet> tabletsToWrite = new ArrayList<>();
+
+      Tablet lastTablet = null;
+      while (!tablets.isEmpty()) {
+        final Tablet tablet = tablets.peekFirst();
+        if (Objects.isNull(lastTablet)
+            // lastTablet.rowSize is not 0
+            || lastTablet.timestamps[lastTablet.rowSize - 1] < 
tablet.timestamps[0]) {
+          tabletsToWrite.add(tablet);
+          lastTablet = tablet;
+          tablets.pollFirst();
+        } else {
+          break;
+        }
+      }
+
+      if (tablets.isEmpty()) {
+        iterator.remove();
+      }
+
+      final boolean isAligned = device2Aligned.get(deviceId);
+      for (final Tablet tablet : tabletsToWrite) {
+        if (isAligned) {
+          try {
+            fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId), 
tablet.getSchemas());
+          } catch (final WriteProcessException ignore) {
+            // Do nothing if the timeSeries has been registered
+          }
+
+          fileWriter.writeAligned(tablet);
+        } else {
+          for (final MeasurementSchema schema : tablet.getSchemas()) {
+            try {
+              fileWriter.registerTimeseries(new Path(tablet.deviceId), schema);
+            } catch (final WriteProcessException ignore) {
+              // Do nothing if the timeSeries has been registered
+            }
+          }
+
+          fileWriter.write(tablet);
+        }
+      }
+    }
+  }
+
   @Override
   protected long getMaxBatchSizeInBytes() {
     return maxSizeInBytes;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
new file mode 100644
index 00000000000..728eb9f1c63
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector;
+
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventSorter;
+import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class PipeTabletEventSorterTest {
+
+  public static void main(String[] args) {
+    new PipeTabletEventSorterTest().testDeduplicateAndSort();
+    new PipeTabletEventSorterTest().testDeduplicate();
+    new PipeTabletEventSorterTest().testSort();
+  }
+
+  @Test
+  public void testDeduplicateAndSort() {
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+    long timestamp = 300;
+    for (long i = 0; i < 10; i++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp + i);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
timestamp + i);
+      }
+
+      rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp - i);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
timestamp - i);
+      }
+
+      rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
timestamp);
+      }
+    }
+
+    Set<Integer> indices = new HashSet<>();
+    for (int i = 0; i < 30; i++) {
+      indices.add((int) tablet.timestamps[i]);
+    }
+
+    Assert.assertFalse(PipeTransferTabletRawReq.checkSorted(tablet));
+
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+    Assert.assertTrue(PipeTransferTabletRawReq.checkSorted(tablet));
+
+    Assert.assertEquals(indices.size(), tablet.rowSize);
+
+    final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, 
tablet.rowSize);
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, 
tablet.rowSize));
+    }
+
+    for (int i = 1; i < tablet.rowSize; ++i) {
+      Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) 
tablet.values[j])[i - 1]);
+      }
+    }
+  }
+
+  @Test
+  public void testDeduplicate() {
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet = new Tablet("root.sg.device", schemaList, 10);
+
+    long timestamp = 300;
+    for (long i = 0; i < 10; i++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, timestamp);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
timestamp);
+      }
+    }
+
+    Set<Integer> indices = new HashSet<>();
+    for (int i = 0; i < 10; i++) {
+      indices.add((int) tablet.timestamps[i]);
+    }
+
+    Assert.assertTrue(PipeTransferTabletRawReq.checkSorted(tablet));
+
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+    Assert.assertTrue(PipeTransferTabletRawReq.checkSorted(tablet));
+
+    Assert.assertEquals(indices.size(), tablet.rowSize);
+
+    final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, 
tablet.rowSize);
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, 
tablet.rowSize));
+    }
+
+    for (int i = 1; i < tablet.rowSize; ++i) {
+      Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) 
tablet.values[j])[i - 1]);
+      }
+    }
+  }
+
+  @Test
+  public void testSort() {
+    List<MeasurementSchema> schemaList = new ArrayList<>();
+    schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+    schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+    Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+    for (long i = 0; i < 10; i++) {
+      int rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, (long) rowIndex + 2);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long) 
rowIndex + 2);
+      }
+
+      rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, (long) rowIndex);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long) 
rowIndex);
+      }
+
+      rowIndex = tablet.rowSize++;
+      tablet.addTimestamp(rowIndex, (long) rowIndex - 2);
+      for (int s = 0; s < 3; s++) {
+        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long) 
rowIndex - 2);
+      }
+    }
+
+    Set<Integer> indices = new HashSet<>();
+    for (int i = 0; i < 30; i++) {
+      indices.add((int) tablet.timestamps[i]);
+    }
+
+    Assert.assertFalse(PipeTransferTabletRawReq.checkSorted(tablet));
+
+    long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, 
tablet.rowSize);
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, 
tablet.rowSize));
+    }
+
+    for (int i = 1; i < tablet.rowSize; ++i) {
+      Assert.assertTrue(timestamps[i] != timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) tablet.values[j])[i] != ((long[]) 
tablet.values[j])[i - 1]);
+      }
+    }
+
+    new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+    Assert.assertTrue(PipeTransferTabletRawReq.checkSorted(tablet));
+
+    Assert.assertEquals(indices.size(), tablet.rowSize);
+
+    timestamps = Arrays.copyOfRange(tablet.timestamps, 0, tablet.rowSize);
+    for (int i = 0; i < 3; ++i) {
+      Assert.assertArrayEquals(
+          timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, 
tablet.rowSize));
+    }
+
+    for (int i = 1; i < tablet.rowSize; ++i) {
+      Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+      for (int j = 0; j < 3; ++j) {
+        Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) 
tablet.values[j])[i - 1]);
+      }
+    }
+  }
+}

Reply via email to