This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch dev/1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push: new ab37cc16fc6 Pipe: Fixed the bug that sorter may use wrong value when there are duplicated timestamps & Refactor (#15488) (#15500) ab37cc16fc6 is described below commit ab37cc16fc671ac2eae42d2433a77cc3e42742c9 Author: Caideyipi <87789683+caidey...@users.noreply.github.com> AuthorDate: Thu May 15 16:48:57 2025 +0800 Pipe: Fixed the bug that sorter may use wrong value when there are duplicated timestamps & Refactor (#15488) (#15500) --- .../pipe/connector/util/PipeTabletEventSorter.java | 154 +++++++++++++-------- .../pipe/connector/PipeTabletEventSorterTest.java | 20 ++- 2 files changed, 104 insertions(+), 70 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java index 2a5e8769b59..ea4bc2789d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java @@ -35,14 +35,15 @@ public class PipeTabletEventSorter { private final Tablet tablet; private boolean isSorted = true; - private boolean isDeduplicated = true; + private boolean isDeDuplicated = true; private Integer[] index; - private int deduplicatedSize; + private int[] deDuplicatedIndex; + private int deDuplicatedSize; public PipeTabletEventSorter(final Tablet tablet) { this.tablet = tablet; - deduplicatedSize = tablet == null ? 0 : tablet.rowSize; + deDuplicatedSize = tablet == null ? 0 : tablet.rowSize; } public void deduplicateAndSortTimestampsIfNecessary() { @@ -58,19 +59,20 @@ public class PipeTabletEventSorter { isSorted = false; } if (currentTimestamp == previousTimestamp) { - isDeduplicated = false; + isDeDuplicated = false; } - if (!isSorted && !isDeduplicated) { + if (!isSorted && !isDeDuplicated) { break; } } - if (isSorted && isDeduplicated) { + if (isSorted && isDeDuplicated) { return; } index = new Integer[tablet.rowSize]; + deDuplicatedIndex = new int[tablet.rowSize]; for (int i = 0, size = tablet.rowSize; i < size; i++) { index[i] = i; } @@ -79,124 +81,160 @@ public class PipeTabletEventSorter { sortTimestamps(); // Do deduplicate anyway. - // isDeduplicated may be false positive when isSorted is false. + // isDeDuplicated may be false positive when isSorted is false. deduplicateTimestamps(); - isDeduplicated = true; + isDeDuplicated = true; } - if (!isDeduplicated) { + if (!isDeDuplicated) { deduplicateTimestamps(); } - sortAndDeduplicateValuesAndBitMaps(); + sortAndMayDeduplicateValuesAndBitMaps(); } private void sortTimestamps() { + // Index is sorted stably because it is Integer[] Arrays.sort(index, Comparator.comparingLong(i -> tablet.timestamps[i])); Arrays.sort(tablet.timestamps, 0, tablet.rowSize); } private void deduplicateTimestamps() { - deduplicatedSize = 1; + deDuplicatedSize = 0; + long[] timestamps = tablet.timestamps; for (int i = 1, size = tablet.rowSize; i < size; i++) { - if (tablet.timestamps[i] != tablet.timestamps[i - 1]) { - index[deduplicatedSize] = index[i]; - tablet.timestamps[deduplicatedSize] = tablet.timestamps[i]; + if (timestamps[i] != timestamps[i - 1]) { + deDuplicatedIndex[deDuplicatedSize] = i - 1; + timestamps[deDuplicatedSize] = timestamps[i - 1]; - ++deduplicatedSize; + ++deDuplicatedSize; } } - tablet.rowSize = deduplicatedSize; + + deDuplicatedIndex[deDuplicatedSize] = tablet.rowSize - 1; + timestamps[deDuplicatedSize] = timestamps[tablet.rowSize - 1]; + tablet.rowSize = ++deDuplicatedSize; } - private void sortAndDeduplicateValuesAndBitMaps() { + // Input: + // Col: [1, null, 3, 6, null] + // Timestamp: [2, 1, 1, 1, 1] + // Intermediate: + // Index: [1, 2, 3, 4, 0] + // SortedTimestamp: [1, 2] + // DeduplicateIndex: [3, 4] + // Output: + // (Used index: [2(3), 4(0)]) + // Col: [6, 1] + private void sortAndMayDeduplicateValuesAndBitMaps() { int columnIndex = 0; for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) { final IMeasurementSchema schema = tablet.getSchemas().get(i); if (schema != null) { + BitMap deDuplicatedBitMap = null; + BitMap originalBitMap = null; + if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) { + originalBitMap = tablet.bitMaps[columnIndex]; + deDuplicatedBitMap = new BitMap(originalBitMap.getSize()); + } + tablet.values[columnIndex] = - reorderValueList(deduplicatedSize, tablet.values[columnIndex], schema.getType(), index); + reorderValueListAndBitMap( + tablet.values[columnIndex], schema.getType(), originalBitMap, deDuplicatedBitMap); + if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) { - tablet.bitMaps[columnIndex] = - reorderBitMap(deduplicatedSize, tablet.bitMaps[columnIndex], index); + tablet.bitMaps[columnIndex] = deDuplicatedBitMap; } columnIndex++; } } } - private static Object reorderValueList( - int deduplicatedSize, + private Object reorderValueListAndBitMap( final Object valueList, final TSDataType dataType, - final Integer[] index) { + final BitMap originalBitMap, + final BitMap deDuplicatedBitMap) { switch (dataType) { case BOOLEAN: final boolean[] boolValues = (boolean[]) valueList; - final boolean[] deduplicatedBoolValues = new boolean[boolValues.length]; - for (int i = 0; i < deduplicatedSize; i++) { - deduplicatedBoolValues[i] = boolValues[index[i]]; + final boolean[] deDuplicatedBoolValues = new boolean[boolValues.length]; + for (int i = 0; i < deDuplicatedSize; i++) { + deDuplicatedBoolValues[i] = + boolValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; } - return deduplicatedBoolValues; + return deDuplicatedBoolValues; case INT32: final int[] intValues = (int[]) valueList; - final int[] deduplicatedIntValues = new int[intValues.length]; - for (int i = 0; i < deduplicatedSize; i++) { - deduplicatedIntValues[i] = intValues[index[i]]; + final int[] deDuplicatedIntValues = new int[intValues.length]; + for (int i = 0; i < deDuplicatedSize; i++) { + deDuplicatedIntValues[i] = + intValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; } - return deduplicatedIntValues; + return deDuplicatedIntValues; case DATE: final LocalDate[] dateValues = (LocalDate[]) valueList; - final LocalDate[] deduplicatedDateValues = new LocalDate[dateValues.length]; - for (int i = 0; i < deduplicatedSize; i++) { - deduplicatedDateValues[i] = dateValues[index[i]]; + final LocalDate[] deDuplicatedDateValues = new LocalDate[dateValues.length]; + for (int i = 0; i < deDuplicatedSize; i++) { + deDuplicatedDateValues[i] = + dateValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; } - return deduplicatedDateValues; + return deDuplicatedDateValues; case INT64: case TIMESTAMP: final long[] longValues = (long[]) valueList; - final long[] deduplicatedLongValues = new long[longValues.length]; - for (int i = 0; i < deduplicatedSize; i++) { - deduplicatedLongValues[i] = longValues[index[i]]; + final long[] deDuplicatedLongValues = new long[longValues.length]; + for (int i = 0; i < deDuplicatedSize; i++) { + deDuplicatedLongValues[i] = + longValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; } - return deduplicatedLongValues; + return deDuplicatedLongValues; case FLOAT: final float[] floatValues = (float[]) valueList; - final float[] deduplicatedFloatValues = new float[floatValues.length]; - for (int i = 0; i < deduplicatedSize; i++) { - deduplicatedFloatValues[i] = floatValues[index[i]]; + final float[] deDuplicatedFloatValues = new float[floatValues.length]; + for (int i = 0; i < deDuplicatedSize; i++) { + deDuplicatedFloatValues[i] = + floatValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; } - return deduplicatedFloatValues; + return deDuplicatedFloatValues; case DOUBLE: final double[] doubleValues = (double[]) valueList; - final double[] deduplicatedDoubleValues = new double[doubleValues.length]; - for (int i = 0; i < deduplicatedSize; i++) { - deduplicatedDoubleValues[i] = doubleValues[index[i]]; + final double[] deDuplicatedDoubleValues = new double[doubleValues.length]; + for (int i = 0; i < deDuplicatedSize; i++) { + deDuplicatedDoubleValues[i] = + doubleValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; } - return deduplicatedDoubleValues; + return deDuplicatedDoubleValues; case TEXT: case BLOB: case STRING: final Binary[] binaryValues = (Binary[]) valueList; - final Binary[] deduplicatedBinaryValues = new Binary[binaryValues.length]; - for (int i = 0; i < deduplicatedSize; i++) { - deduplicatedBinaryValues[i] = binaryValues[index[i]]; + final Binary[] deDuplicatedBinaryValues = new Binary[binaryValues.length]; + for (int i = 0; i < deDuplicatedSize; i++) { + deDuplicatedBinaryValues[i] = + binaryValues[getLastNonnullIndex(i, originalBitMap, deDuplicatedBitMap)]; } - return deduplicatedBinaryValues; + return deDuplicatedBinaryValues; default: throw new UnSupportedDataTypeException( String.format("Data type %s is not supported.", dataType)); } } - private static BitMap reorderBitMap( - int deduplicatedSize, final BitMap bitMap, final Integer[] index) { - final BitMap deduplicatedBitMap = new BitMap(bitMap.getSize()); - for (int i = 0; i < deduplicatedSize; i++) { - if (bitMap.isMarked(index[i])) { - deduplicatedBitMap.mark(i); + private int getLastNonnullIndex( + final int i, final BitMap originalBitMap, final BitMap deDuplicatedBitMap) { + if (originalBitMap == null) { + return index[deDuplicatedIndex[i]]; + } + int lastNonnullIndex = deDuplicatedIndex[i]; + final int lastIndex = i > 0 ? deDuplicatedIndex[i - 1] : -1; + while (originalBitMap.isMarked(index[lastNonnullIndex])) { + --lastNonnullIndex; + if (lastNonnullIndex == lastIndex) { + deDuplicatedBitMap.mark(i); + return index[lastNonnullIndex + 1]; } } - return deduplicatedBitMap; + return index[lastNonnullIndex]; } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java index e58bcf1c294..f5b1276af6c 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java @@ -112,10 +112,13 @@ public class PipeTabletEventSorterTest { long timestamp = 300; for (long i = 0; i < 10; i++) { - int rowIndex = tablet.rowSize++; + final int rowIndex = tablet.rowSize++; tablet.addTimestamp(rowIndex, timestamp); for (int s = 0; s < 3; s++) { - tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, timestamp); + tablet.addValue( + schemaList.get(s).getMeasurementId(), + rowIndex, + (i + s) % 3 != 0 ? timestamp + i : null); } } @@ -133,16 +136,9 @@ public class PipeTabletEventSorterTest { Assert.assertEquals(indices.size(), tablet.rowSize); final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, tablet.rowSize); - for (int i = 0; i < 3; ++i) { - Assert.assertArrayEquals( - timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, tablet.rowSize)); - } - - for (int i = 1; i < tablet.rowSize; ++i) { - Assert.assertTrue(timestamps[i] > timestamps[i - 1]); - for (int j = 0; j < 3; ++j) { - Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) tablet.values[j])[i - 1]); - } + Assert.assertEquals(timestamps[0] + 8, ((long[]) tablet.values[0])[0]); + for (int i = 1; i < 3; ++i) { + Assert.assertEquals(timestamps[0] + 9, ((long[]) tablet.values[i])[0]); } }