This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ab37cc16fc6 Pipe: Fixed the bug that sorter may use wrong value when 
there are duplicated timestamps & Refactor (#15488) (#15500)
ab37cc16fc6 is described below

commit ab37cc16fc671ac2eae42d2433a77cc3e42742c9
Author: Caideyipi <87789683+caidey...@users.noreply.github.com>
AuthorDate: Thu May 15 16:48:57 2025 +0800

    Pipe: Fixed the bug that sorter may use wrong value when there are 
duplicated timestamps & Refactor (#15488) (#15500)
---
 .../pipe/connector/util/PipeTabletEventSorter.java | 154 +++++++++++++--------
 .../pipe/connector/PipeTabletEventSorterTest.java  |  20 ++-
 2 files changed, 104 insertions(+), 70 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
index 2a5e8769b59..ea4bc2789d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
@@ -35,14 +35,15 @@ public class PipeTabletEventSorter {
   private final Tablet tablet;
 
   private boolean isSorted = true;
-  private boolean isDeduplicated = true;
+  private boolean isDeDuplicated = true;
 
   private Integer[] index;
-  private int deduplicatedSize;
+  private int[] deDuplicatedIndex;
+  private int deDuplicatedSize;
 
   public PipeTabletEventSorter(final Tablet tablet) {
     this.tablet = tablet;
-    deduplicatedSize = tablet == null ? 0 : tablet.rowSize;
+    deDuplicatedSize = tablet == null ? 0 : tablet.rowSize;
   }
 
   public void deduplicateAndSortTimestampsIfNecessary() {
@@ -58,19 +59,20 @@ public class PipeTabletEventSorter {
         isSorted = false;
       }
       if (currentTimestamp == previousTimestamp) {
-        isDeduplicated = false;
+        isDeDuplicated = false;
       }
 
-      if (!isSorted && !isDeduplicated) {
+      if (!isSorted && !isDeDuplicated) {
         break;
       }
     }
 
-    if (isSorted && isDeduplicated) {
+    if (isSorted && isDeDuplicated) {
       return;
     }
 
     index = new Integer[tablet.rowSize];
+    deDuplicatedIndex = new int[tablet.rowSize];
     for (int i = 0, size = tablet.rowSize; i < size; i++) {
       index[i] = i;
     }
@@ -79,124 +81,160 @@ public class PipeTabletEventSorter {
       sortTimestamps();
 
       // Do deduplicate anyway.
-      // isDeduplicated may be false positive when isSorted is false.
+      // isDeDuplicated may be false positive when isSorted is false.
       deduplicateTimestamps();
-      isDeduplicated = true;
+      isDeDuplicated = true;
     }
 
-    if (!isDeduplicated) {
+    if (!isDeDuplicated) {
       deduplicateTimestamps();
     }
 
-    sortAndDeduplicateValuesAndBitMaps();
+    sortAndMayDeduplicateValuesAndBitMaps();
   }
 
   private void sortTimestamps() {
+    // Index is sorted stably because it is Integer[]
     Arrays.sort(index, Comparator.comparingLong(i -> tablet.timestamps[i]));
     Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
   }
 
   private void deduplicateTimestamps() {
-    deduplicatedSize = 1;
+    deDuplicatedSize = 0;
+    long[] timestamps = tablet.timestamps;
     for (int i = 1, size = tablet.rowSize; i < size; i++) {
-      if (tablet.timestamps[i] != tablet.timestamps[i - 1]) {
-        index[deduplicatedSize] = index[i];
-        tablet.timestamps[deduplicatedSize] = tablet.timestamps[i];
+      if (timestamps[i] != timestamps[i - 1]) {
+        deDuplicatedIndex[deDuplicatedSize] = i - 1;
+        timestamps[deDuplicatedSize] = timestamps[i - 1];
 
-        ++deduplicatedSize;
+        ++deDuplicatedSize;
       }
     }
-    tablet.rowSize = deduplicatedSize;
+
+    deDuplicatedIndex[deDuplicatedSize] = tablet.rowSize - 1;
+    timestamps[deDuplicatedSize] = timestamps[tablet.rowSize - 1];
+    tablet.rowSize = ++deDuplicatedSize;
   }
 
-  private void sortAndDeduplicateValuesAndBitMaps() {
+  // Input:
+  // Col: [1, null, 3, 6, null]
+  // Timestamp: [2, 1, 1, 1, 1]
+  // Intermediate:
+  // Index: [1, 2, 3, 4, 0]
+  // SortedTimestamp: [1, 2]
+  // DeduplicateIndex: [3, 4]
+  // Output:
+  // (Used index: [2(3), 4(0)])
+  // Col: [6, 1]
+  private void sortAndMayDeduplicateValuesAndBitMaps() {
     int columnIndex = 0;
     for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
       final IMeasurementSchema schema = tablet.getSchemas().get(i);
       if (schema != null) {
+        BitMap deDuplicatedBitMap = null;
+        BitMap originalBitMap = null;
+        if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
+          originalBitMap = tablet.bitMaps[columnIndex];
+          deDuplicatedBitMap = new BitMap(originalBitMap.getSize());
+        }
+
         tablet.values[columnIndex] =
-            reorderValueList(deduplicatedSize, tablet.values[columnIndex], 
schema.getType(), index);
+            reorderValueListAndBitMap(
+                tablet.values[columnIndex], schema.getType(), originalBitMap, 
deDuplicatedBitMap);
+
         if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
-          tablet.bitMaps[columnIndex] =
-              reorderBitMap(deduplicatedSize, tablet.bitMaps[columnIndex], 
index);
+          tablet.bitMaps[columnIndex] = deDuplicatedBitMap;
         }
         columnIndex++;
       }
     }
   }
 
-  private static Object reorderValueList(
-      int deduplicatedSize,
+  private Object reorderValueListAndBitMap(
       final Object valueList,
       final TSDataType dataType,
-      final Integer[] index) {
+      final BitMap originalBitMap,
+      final BitMap deDuplicatedBitMap) {
     switch (dataType) {
       case BOOLEAN:
         final boolean[] boolValues = (boolean[]) valueList;
-        final boolean[] deduplicatedBoolValues = new 
boolean[boolValues.length];
-        for (int i = 0; i < deduplicatedSize; i++) {
-          deduplicatedBoolValues[i] = boolValues[index[i]];
+        final boolean[] deDuplicatedBoolValues = new 
boolean[boolValues.length];
+        for (int i = 0; i < deDuplicatedSize; i++) {
+          deDuplicatedBoolValues[i] =
+              boolValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
         }
-        return deduplicatedBoolValues;
+        return deDuplicatedBoolValues;
       case INT32:
         final int[] intValues = (int[]) valueList;
-        final int[] deduplicatedIntValues = new int[intValues.length];
-        for (int i = 0; i < deduplicatedSize; i++) {
-          deduplicatedIntValues[i] = intValues[index[i]];
+        final int[] deDuplicatedIntValues = new int[intValues.length];
+        for (int i = 0; i < deDuplicatedSize; i++) {
+          deDuplicatedIntValues[i] =
+              intValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
         }
-        return deduplicatedIntValues;
+        return deDuplicatedIntValues;
       case DATE:
         final LocalDate[] dateValues = (LocalDate[]) valueList;
-        final LocalDate[] deduplicatedDateValues = new 
LocalDate[dateValues.length];
-        for (int i = 0; i < deduplicatedSize; i++) {
-          deduplicatedDateValues[i] = dateValues[index[i]];
+        final LocalDate[] deDuplicatedDateValues = new 
LocalDate[dateValues.length];
+        for (int i = 0; i < deDuplicatedSize; i++) {
+          deDuplicatedDateValues[i] =
+              dateValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
         }
-        return deduplicatedDateValues;
+        return deDuplicatedDateValues;
       case INT64:
       case TIMESTAMP:
         final long[] longValues = (long[]) valueList;
-        final long[] deduplicatedLongValues = new long[longValues.length];
-        for (int i = 0; i < deduplicatedSize; i++) {
-          deduplicatedLongValues[i] = longValues[index[i]];
+        final long[] deDuplicatedLongValues = new long[longValues.length];
+        for (int i = 0; i < deDuplicatedSize; i++) {
+          deDuplicatedLongValues[i] =
+              longValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
         }
-        return deduplicatedLongValues;
+        return deDuplicatedLongValues;
       case FLOAT:
         final float[] floatValues = (float[]) valueList;
-        final float[] deduplicatedFloatValues = new float[floatValues.length];
-        for (int i = 0; i < deduplicatedSize; i++) {
-          deduplicatedFloatValues[i] = floatValues[index[i]];
+        final float[] deDuplicatedFloatValues = new float[floatValues.length];
+        for (int i = 0; i < deDuplicatedSize; i++) {
+          deDuplicatedFloatValues[i] =
+              floatValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
         }
-        return deduplicatedFloatValues;
+        return deDuplicatedFloatValues;
       case DOUBLE:
         final double[] doubleValues = (double[]) valueList;
-        final double[] deduplicatedDoubleValues = new 
double[doubleValues.length];
-        for (int i = 0; i < deduplicatedSize; i++) {
-          deduplicatedDoubleValues[i] = doubleValues[index[i]];
+        final double[] deDuplicatedDoubleValues = new 
double[doubleValues.length];
+        for (int i = 0; i < deDuplicatedSize; i++) {
+          deDuplicatedDoubleValues[i] =
+              doubleValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
         }
-        return deduplicatedDoubleValues;
+        return deDuplicatedDoubleValues;
       case TEXT:
       case BLOB:
       case STRING:
         final Binary[] binaryValues = (Binary[]) valueList;
-        final Binary[] deduplicatedBinaryValues = new 
Binary[binaryValues.length];
-        for (int i = 0; i < deduplicatedSize; i++) {
-          deduplicatedBinaryValues[i] = binaryValues[index[i]];
+        final Binary[] deDuplicatedBinaryValues = new 
Binary[binaryValues.length];
+        for (int i = 0; i < deDuplicatedSize; i++) {
+          deDuplicatedBinaryValues[i] =
+              binaryValues[getLastNonnullIndex(i, originalBitMap, 
deDuplicatedBitMap)];
         }
-        return deduplicatedBinaryValues;
+        return deDuplicatedBinaryValues;
       default:
         throw new UnSupportedDataTypeException(
             String.format("Data type %s is not supported.", dataType));
     }
   }
 
-  private static BitMap reorderBitMap(
-      int deduplicatedSize, final BitMap bitMap, final Integer[] index) {
-    final BitMap deduplicatedBitMap = new BitMap(bitMap.getSize());
-    for (int i = 0; i < deduplicatedSize; i++) {
-      if (bitMap.isMarked(index[i])) {
-        deduplicatedBitMap.mark(i);
+  private int getLastNonnullIndex(
+      final int i, final BitMap originalBitMap, final BitMap 
deDuplicatedBitMap) {
+    if (originalBitMap == null) {
+      return index[deDuplicatedIndex[i]];
+    }
+    int lastNonnullIndex = deDuplicatedIndex[i];
+    final int lastIndex = i > 0 ? deDuplicatedIndex[i - 1] : -1;
+    while (originalBitMap.isMarked(index[lastNonnullIndex])) {
+      --lastNonnullIndex;
+      if (lastNonnullIndex == lastIndex) {
+        deDuplicatedBitMap.mark(i);
+        return index[lastNonnullIndex + 1];
       }
     }
-    return deduplicatedBitMap;
+    return index[lastNonnullIndex];
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
index e58bcf1c294..f5b1276af6c 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
@@ -112,10 +112,13 @@ public class PipeTabletEventSorterTest {
 
     long timestamp = 300;
     for (long i = 0; i < 10; i++) {
-      int rowIndex = tablet.rowSize++;
+      final int rowIndex = tablet.rowSize++;
       tablet.addTimestamp(rowIndex, timestamp);
       for (int s = 0; s < 3; s++) {
-        tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, 
timestamp);
+        tablet.addValue(
+            schemaList.get(s).getMeasurementId(),
+            rowIndex,
+            (i + s) % 3 != 0 ? timestamp + i : null);
       }
     }
 
@@ -133,16 +136,9 @@ public class PipeTabletEventSorterTest {
     Assert.assertEquals(indices.size(), tablet.rowSize);
 
     final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, 
tablet.rowSize);
-    for (int i = 0; i < 3; ++i) {
-      Assert.assertArrayEquals(
-          timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, 
tablet.rowSize));
-    }
-
-    for (int i = 1; i < tablet.rowSize; ++i) {
-      Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
-      for (int j = 0; j < 3; ++j) {
-        Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) 
tablet.values[j])[i - 1]);
-      }
+    Assert.assertEquals(timestamps[0] + 8, ((long[]) tablet.values[0])[0]);
+    for (int i = 1; i < 3; ++i) {
+      Assert.assertEquals(timestamps[0] + 9, ((long[]) tablet.values[i])[0]);
     }
   }
 

Reply via email to