This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new c1df2bf8a65 Pipe: Avoid writing out-of-order data in tsfile when
`'sink.format' = 'file'` (#12810)
c1df2bf8a65 is described below
commit c1df2bf8a6579c702b4fd2e0a618d3499978a870
Author: Steve Yurong Su <[email protected]>
AuthorDate: Wed Jun 26 18:45:38 2024 +0800
Pipe: Avoid writing out-of-order data in tsfile when `'sink.format' =
'file'` (#12810)
---
.../batch/PipeTabletEventTsFileBatch.java | 159 ++++++++++-----
.../request/PipeTransferTabletRawReq.java | 133 +------------
.../pipe/connector/util/PipeTabletEventSorter.java | 202 +++++++++++++++++++
.../pipe/connector/PipeTabletEventSorterTest.java | 217 +++++++++++++++++++++
4 files changed, 530 insertions(+), 181 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 6abca7b3183..4864a124668 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
@@ -48,6 +49,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -165,7 +167,12 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
}
private void bufferTablet(
- final String pipeName, long creationTime, Tablet tablet, boolean
isAligned) {
+ final String pipeName,
+ final long creationTime,
+ final Tablet tablet,
+ final boolean isAligned) {
+ new
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
pipeName2WeightMap.compute(
@@ -208,12 +215,19 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
Comparator.comparingLong(tablet -> tablet.timestamps[0]));
}
+ // Sort the devices by device id
+ List<String> devices = new ArrayList<>(device2Tablets.keySet());
+ devices.sort(Comparator.naturalOrder());
+
// Replace ArrayList with LinkedList to improve performance
- final Map<String, LinkedList<Tablet>> device2TabletsLinkedList = new
HashMap<>();
- for (final Map.Entry<String, List<Tablet>> entry :
device2Tablets.entrySet()) {
- device2TabletsLinkedList.put(entry.getKey(), new
LinkedList<>(entry.getValue()));
+ final LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList =
+ new LinkedHashMap<>();
+ for (final String device : devices) {
+ device2TabletsLinkedList.put(device, new
LinkedList<>(device2Tablets.get(device)));
}
- // Clear the original device2Tablets to release memory
+
+ // Help GC
+ devices.clear();
device2Tablets.clear();
// Write the tablets to the tsfile device by device, and the tablets
@@ -241,56 +255,44 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
+ TsFileConstant.TSFILE_SUFFIX));
}
- final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
- device2TabletsLinkedList.entrySet().iterator();
-
- while (iterator.hasNext()) {
- final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next();
- final String deviceId = entry.getKey();
- final LinkedList<Tablet> tablets = entry.getValue();
-
- final List<Tablet> tabletsToWrite = new ArrayList<>();
-
- Tablet lastTablet = null;
- while (!tablets.isEmpty()) {
- final Tablet tablet = tablets.peekFirst();
- if (Objects.isNull(lastTablet)
- // lastTablet.rowSize is not 0
- || lastTablet.timestamps[lastTablet.rowSize - 1] <
tablet.timestamps[0]) {
- tabletsToWrite.add(tablet);
- lastTablet = tablet;
- tablets.pollFirst();
- } else {
- break;
- }
- }
+ try {
+ tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList,
device2Aligned);
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Batch id = {}: Failed to write tablets into tsfile, because {}",
+ currentBatchId.get(),
+ e.getMessage(),
+ e);
- if (tablets.isEmpty()) {
- iterator.remove();
+ try {
+ fileWriter.close();
+ } catch (final Exception closeException) {
+ LOGGER.warn(
+ "Batch id = {}: Failed to close the tsfile {} after failed to
write tablets into, because {}",
+ currentBatchId.get(),
+ fileWriter.getIOWriter().getFile().getPath(),
+ closeException.getMessage(),
+ closeException);
+ } finally {
+ // Add current writing file to the list and delete the file
+ sealedFiles.add(fileWriter.getIOWriter().getFile());
}
- final boolean isAligned = device2Aligned.get(deviceId);
- for (final Tablet tablet : tabletsToWrite) {
- if (isAligned) {
- try {
- fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId),
tablet.getSchemas());
- } catch (final WriteProcessException ignore) {
- // Do nothing if the timeSeries has been registered
- }
+ for (final File sealedFile : sealedFiles) {
+ final boolean deleteSuccess = FileUtils.deleteQuietly(sealedFile);
+ LOGGER.warn(
+ "Batch id = {}: {} delete the tsfile {} after failed to write
tablets into {}. {}",
+ currentBatchId.get(),
+ deleteSuccess ? "Successfully" : "Failed to",
+ sealedFile.getPath(),
+ fileWriter.getIOWriter().getFile().getPath(),
+ deleteSuccess ? "" : "Maybe the tsfile needs to be deleted
manually.");
+ }
+ sealedFiles.clear();
- fileWriter.writeAligned(tablet);
- } else {
- for (final MeasurementSchema schema : tablet.getSchemas()) {
- try {
- fileWriter.registerTimeseries(new Path(tablet.deviceId),
schema);
- } catch (final WriteProcessException ignore) {
- // Do nothing if the timeSeries has been registered
- }
- }
+ fileWriter = null;
- fileWriter.write(tablet);
- }
- }
+ throw e;
}
fileWriter.close();
@@ -308,6 +310,63 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
return sealedFiles;
}
+ private void tryBestToWriteTabletsIntoOneFile(
+ LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList,
+ Map<String, Boolean> device2Aligned)
+ throws IOException, WriteProcessException {
+ final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
+ device2TabletsLinkedList.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next();
+ final String deviceId = entry.getKey();
+ final LinkedList<Tablet> tablets = entry.getValue();
+
+ final List<Tablet> tabletsToWrite = new ArrayList<>();
+
+ Tablet lastTablet = null;
+ while (!tablets.isEmpty()) {
+ final Tablet tablet = tablets.peekFirst();
+ if (Objects.isNull(lastTablet)
+ // lastTablet.rowSize is not 0
+ || lastTablet.timestamps[lastTablet.rowSize - 1] <
tablet.timestamps[0]) {
+ tabletsToWrite.add(tablet);
+ lastTablet = tablet;
+ tablets.pollFirst();
+ } else {
+ break;
+ }
+ }
+
+ if (tablets.isEmpty()) {
+ iterator.remove();
+ }
+
+ final boolean isAligned = device2Aligned.get(deviceId);
+ for (final Tablet tablet : tabletsToWrite) {
+ if (isAligned) {
+ try {
+ fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId),
tablet.getSchemas());
+ } catch (final WriteProcessException ignore) {
+ // Do nothing if the timeSeries has been registered
+ }
+
+ fileWriter.writeAligned(tablet);
+ } else {
+ for (final MeasurementSchema schema : tablet.getSchemas()) {
+ try {
+ fileWriter.registerTimeseries(new Path(tablet.deviceId), schema);
+ } catch (final WriteProcessException ignore) {
+ // Do nothing if the timeSeries has been registered
+ }
+ }
+
+ fileWriter.write(tablet);
+ }
+ }
+ }
+ }
+
@Override
protected long getMaxBatchSizeInBytes() {
return maxSizeInBytes;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
index 33a98d84e42..ed4ca323fbe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -23,18 +23,15 @@ import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.IoTDBConnectorRequestVersion;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
import org.apache.iotdb.db.queryengine.plan.parser.StatementGenerator;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TSInsertTabletReq;
import org.apache.iotdb.session.util.SessionUtils;
-import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.utils.Binary;
-import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
@@ -43,9 +40,6 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.time.LocalDate;
-import java.util.Arrays;
-import java.util.Comparator;
import java.util.Objects;
public class PipeTransferTabletRawReq extends TPipeTransferReq {
@@ -64,9 +58,7 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
}
public InsertTabletStatement constructStatement() {
- if (!checkSorted(tablet)) {
- sortTablet(tablet);
- }
+ new
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
try {
final TSInsertTabletReq request = new TSInsertTabletReq();
@@ -91,127 +83,6 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
}
}
- public static boolean checkSorted(final Tablet tablet) {
- for (int i = 1; i < tablet.rowSize; i++) {
- if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
- return false;
- }
- }
- return true;
- }
-
- public static void sortTablet(final Tablet tablet) {
- /*
- * following part of code sort the batch data by time,
- * so we can insert continuous data in value list to get a better
performance
- */
- // sort to get index, and use index to sort value list
- final Integer[] index = new Integer[tablet.rowSize];
- for (int i = 0; i < tablet.rowSize; i++) {
- index[i] = i;
- }
- Arrays.sort(index, Comparator.comparingLong(o -> tablet.timestamps[o]));
- Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
- int columnIndex = 0;
- for (int i = 0; i < tablet.getSchemas().size(); i++) {
- final IMeasurementSchema schema = tablet.getSchemas().get(i);
- if (schema != null) {
- tablet.values[columnIndex] = sortList(tablet.values[columnIndex],
schema.getType(), index);
- if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
- tablet.bitMaps[columnIndex] =
sortBitMap(tablet.bitMaps[columnIndex], index);
- }
- columnIndex++;
- }
- }
- }
-
- /**
- * Sort value list by index.
- *
- * @param valueList value list
- * @param dataType data type
- * @param index index
- * @return sorted list
- * @throws UnSupportedDataTypeException if dataType is illegal
- */
- private static Object sortList(
- final Object valueList, final TSDataType dataType, final Integer[]
index) {
- switch (dataType) {
- case BOOLEAN:
- final boolean[] boolValues = (boolean[]) valueList;
- final boolean[] sortedValues = new boolean[boolValues.length];
- for (int i = 0; i < index.length; i++) {
- sortedValues[i] = boolValues[index[i]];
- }
- return sortedValues;
- case INT32:
- final int[] intValues = (int[]) valueList;
- final int[] sortedIntValues = new int[intValues.length];
- for (int i = 0; i < index.length; i++) {
- sortedIntValues[i] = intValues[index[i]];
- }
- return sortedIntValues;
- case DATE:
- final LocalDate[] dateValues = (LocalDate[]) valueList;
- final LocalDate[] sortedDateValues = new LocalDate[dateValues.length];
- for (int i = 0; i < index.length; i++) {
- sortedDateValues[i] = dateValues[index[i]];
- }
- return sortedDateValues;
- case INT64:
- case TIMESTAMP:
- final long[] longValues = (long[]) valueList;
- final long[] sortedLongValues = new long[longValues.length];
- for (int i = 0; i < index.length; i++) {
- sortedLongValues[i] = longValues[index[i]];
- }
- return sortedLongValues;
- case FLOAT:
- final float[] floatValues = (float[]) valueList;
- final float[] sortedFloatValues = new float[floatValues.length];
- for (int i = 0; i < index.length; i++) {
- sortedFloatValues[i] = floatValues[index[i]];
- }
- return sortedFloatValues;
- case DOUBLE:
- final double[] doubleValues = (double[]) valueList;
- final double[] sortedDoubleValues = new double[doubleValues.length];
- for (int i = 0; i < index.length; i++) {
- sortedDoubleValues[i] = doubleValues[index[i]];
- }
- return sortedDoubleValues;
- case TEXT:
- case BLOB:
- case STRING:
- final Binary[] binaryValues = (Binary[]) valueList;
- final Binary[] sortedBinaryValues = new Binary[binaryValues.length];
- for (int i = 0; i < index.length; i++) {
- sortedBinaryValues[i] = binaryValues[index[i]];
- }
- return sortedBinaryValues;
- default:
- throw new UnSupportedDataTypeException(
- String.format("Data type %s is not supported.", dataType));
- }
- }
-
- /**
- * Sort BitMap by index.
- *
- * @param bitMap BitMap to be sorted
- * @param index index
- * @return sorted bitMap
- */
- private static BitMap sortBitMap(final BitMap bitMap, final Integer[] index)
{
- final BitMap sortedBitMap = new BitMap(bitMap.getSize());
- for (int i = 0; i < index.length; i++) {
- if (bitMap.isMarked(index[i])) {
- sortedBitMap.mark(i);
- }
- }
- return sortedBitMap;
- }
-
/////////////////////////////// WriteBack & Batch
///////////////////////////////
public static PipeTransferTabletRawReq toTPipeTransferRawReq(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
new file mode 100644
index 00000000000..2a5e8769b59
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/PipeTabletEventSorter.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.util;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class PipeTabletEventSorter {
+
+ private final Tablet tablet;
+
+ private boolean isSorted = true;
+ private boolean isDeduplicated = true;
+
+ private Integer[] index;
+ private int deduplicatedSize;
+
+ public PipeTabletEventSorter(final Tablet tablet) {
+ this.tablet = tablet;
+ deduplicatedSize = tablet == null ? 0 : tablet.rowSize;
+ }
+
+ public void deduplicateAndSortTimestampsIfNecessary() {
+ if (tablet == null || tablet.rowSize == 0) {
+ return;
+ }
+
+ for (int i = 1, size = tablet.rowSize; i < size; ++i) {
+ final long currentTimestamp = tablet.timestamps[i];
+ final long previousTimestamp = tablet.timestamps[i - 1];
+
+ if (currentTimestamp < previousTimestamp) {
+ isSorted = false;
+ }
+ if (currentTimestamp == previousTimestamp) {
+ isDeduplicated = false;
+ }
+
+ if (!isSorted && !isDeduplicated) {
+ break;
+ }
+ }
+
+ if (isSorted && isDeduplicated) {
+ return;
+ }
+
+ index = new Integer[tablet.rowSize];
+ for (int i = 0, size = tablet.rowSize; i < size; i++) {
+ index[i] = i;
+ }
+
+ if (!isSorted) {
+ sortTimestamps();
+
+ // Do deduplicate anyway.
+ // isDeduplicated may be false positive when isSorted is false.
+ deduplicateTimestamps();
+ isDeduplicated = true;
+ }
+
+ if (!isDeduplicated) {
+ deduplicateTimestamps();
+ }
+
+ sortAndDeduplicateValuesAndBitMaps();
+ }
+
+ private void sortTimestamps() {
+ Arrays.sort(index, Comparator.comparingLong(i -> tablet.timestamps[i]));
+ Arrays.sort(tablet.timestamps, 0, tablet.rowSize);
+ }
+
+ private void deduplicateTimestamps() {
+ deduplicatedSize = 1;
+ for (int i = 1, size = tablet.rowSize; i < size; i++) {
+ if (tablet.timestamps[i] != tablet.timestamps[i - 1]) {
+ index[deduplicatedSize] = index[i];
+ tablet.timestamps[deduplicatedSize] = tablet.timestamps[i];
+
+ ++deduplicatedSize;
+ }
+ }
+ tablet.rowSize = deduplicatedSize;
+ }
+
+ private void sortAndDeduplicateValuesAndBitMaps() {
+ int columnIndex = 0;
+ for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
+ final IMeasurementSchema schema = tablet.getSchemas().get(i);
+ if (schema != null) {
+ tablet.values[columnIndex] =
+ reorderValueList(deduplicatedSize, tablet.values[columnIndex],
schema.getType(), index);
+ if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
+ tablet.bitMaps[columnIndex] =
+ reorderBitMap(deduplicatedSize, tablet.bitMaps[columnIndex],
index);
+ }
+ columnIndex++;
+ }
+ }
+ }
+
+ private static Object reorderValueList(
+ int deduplicatedSize,
+ final Object valueList,
+ final TSDataType dataType,
+ final Integer[] index) {
+ switch (dataType) {
+ case BOOLEAN:
+ final boolean[] boolValues = (boolean[]) valueList;
+ final boolean[] deduplicatedBoolValues = new
boolean[boolValues.length];
+ for (int i = 0; i < deduplicatedSize; i++) {
+ deduplicatedBoolValues[i] = boolValues[index[i]];
+ }
+ return deduplicatedBoolValues;
+ case INT32:
+ final int[] intValues = (int[]) valueList;
+ final int[] deduplicatedIntValues = new int[intValues.length];
+ for (int i = 0; i < deduplicatedSize; i++) {
+ deduplicatedIntValues[i] = intValues[index[i]];
+ }
+ return deduplicatedIntValues;
+ case DATE:
+ final LocalDate[] dateValues = (LocalDate[]) valueList;
+ final LocalDate[] deduplicatedDateValues = new
LocalDate[dateValues.length];
+ for (int i = 0; i < deduplicatedSize; i++) {
+ deduplicatedDateValues[i] = dateValues[index[i]];
+ }
+ return deduplicatedDateValues;
+ case INT64:
+ case TIMESTAMP:
+ final long[] longValues = (long[]) valueList;
+ final long[] deduplicatedLongValues = new long[longValues.length];
+ for (int i = 0; i < deduplicatedSize; i++) {
+ deduplicatedLongValues[i] = longValues[index[i]];
+ }
+ return deduplicatedLongValues;
+ case FLOAT:
+ final float[] floatValues = (float[]) valueList;
+ final float[] deduplicatedFloatValues = new float[floatValues.length];
+ for (int i = 0; i < deduplicatedSize; i++) {
+ deduplicatedFloatValues[i] = floatValues[index[i]];
+ }
+ return deduplicatedFloatValues;
+ case DOUBLE:
+ final double[] doubleValues = (double[]) valueList;
+ final double[] deduplicatedDoubleValues = new
double[doubleValues.length];
+ for (int i = 0; i < deduplicatedSize; i++) {
+ deduplicatedDoubleValues[i] = doubleValues[index[i]];
+ }
+ return deduplicatedDoubleValues;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ final Binary[] binaryValues = (Binary[]) valueList;
+ final Binary[] deduplicatedBinaryValues = new
Binary[binaryValues.length];
+ for (int i = 0; i < deduplicatedSize; i++) {
+ deduplicatedBinaryValues[i] = binaryValues[index[i]];
+ }
+ return deduplicatedBinaryValues;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
+ }
+ }
+
+ private static BitMap reorderBitMap(
+ int deduplicatedSize, final BitMap bitMap, final Integer[] index) {
+ final BitMap deduplicatedBitMap = new BitMap(bitMap.getSize());
+ for (int i = 0; i < deduplicatedSize; i++) {
+ if (bitMap.isMarked(index[i])) {
+ deduplicatedBitMap.mark(i);
+ }
+ }
+ return deduplicatedBitMap;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
new file mode 100644
index 00000000000..e58bcf1c294
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector;
+
+import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class PipeTabletEventSorterTest {
+
+ private static boolean checkSorted(final Tablet tablet) {
+ for (int i = 1; i < tablet.rowSize; i++) {
+ if (tablet.timestamps[i] < tablet.timestamps[i - 1]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Test
+ public void testDeduplicateAndSort() {
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+ Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+ long timestamp = 300;
+ for (long i = 0; i < 10; i++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp + i);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex,
timestamp + i);
+ }
+
+ rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp - i);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex,
timestamp - i);
+ }
+
+ rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex,
timestamp);
+ }
+ }
+
+ Set<Integer> indices = new HashSet<>();
+ for (int i = 0; i < 30; i++) {
+ indices.add((int) tablet.timestamps[i]);
+ }
+
+ Assert.assertFalse(checkSorted(tablet));
+
+ new
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+ Assert.assertTrue(checkSorted(tablet));
+
+ Assert.assertEquals(indices.size(), tablet.rowSize);
+
+ final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0,
tablet.rowSize);
+ for (int i = 0; i < 3; ++i) {
+ Assert.assertArrayEquals(
+ timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0,
tablet.rowSize));
+ }
+
+ for (int i = 1; i < tablet.rowSize; ++i) {
+ Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+ for (int j = 0; j < 3; ++j) {
+ Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[])
tablet.values[j])[i - 1]);
+ }
+ }
+ }
+
+ @Test
+ public void testDeduplicate() {
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+ Tablet tablet = new Tablet("root.sg.device", schemaList, 10);
+
+ long timestamp = 300;
+ for (long i = 0; i < 10; i++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, timestamp);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex,
timestamp);
+ }
+ }
+
+ Set<Integer> indices = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ indices.add((int) tablet.timestamps[i]);
+ }
+
+ Assert.assertTrue(checkSorted(tablet));
+
+ new
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+ Assert.assertTrue(checkSorted(tablet));
+
+ Assert.assertEquals(indices.size(), tablet.rowSize);
+
+ final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0,
tablet.rowSize);
+ for (int i = 0; i < 3; ++i) {
+ Assert.assertArrayEquals(
+ timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0,
tablet.rowSize));
+ }
+
+ for (int i = 1; i < tablet.rowSize; ++i) {
+ Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+ for (int j = 0; j < 3; ++j) {
+ Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[])
tablet.values[j])[i - 1]);
+ }
+ }
+ }
+
+ @Test
+ public void testSort() {
+ List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema("s1", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s2", TSDataType.INT64));
+ schemaList.add(new MeasurementSchema("s3", TSDataType.INT64));
+
+ Tablet tablet = new Tablet("root.sg.device", schemaList, 30);
+
+ for (long i = 0; i < 10; i++) {
+ int rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, (long) rowIndex + 2);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long)
rowIndex + 2);
+ }
+
+ rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, (long) rowIndex);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long)
rowIndex);
+ }
+
+ rowIndex = tablet.rowSize++;
+ tablet.addTimestamp(rowIndex, (long) rowIndex - 2);
+ for (int s = 0; s < 3; s++) {
+ tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long)
rowIndex - 2);
+ }
+ }
+
+ Set<Integer> indices = new HashSet<>();
+ for (int i = 0; i < 30; i++) {
+ indices.add((int) tablet.timestamps[i]);
+ }
+
+ Assert.assertFalse(checkSorted(tablet));
+
+ long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0,
tablet.rowSize);
+ for (int i = 0; i < 3; ++i) {
+ Assert.assertArrayEquals(
+ timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0,
tablet.rowSize));
+ }
+
+ for (int i = 1; i < tablet.rowSize; ++i) {
+ Assert.assertTrue(timestamps[i] != timestamps[i - 1]);
+ for (int j = 0; j < 3; ++j) {
+ Assert.assertTrue(((long[]) tablet.values[j])[i] != ((long[])
tablet.values[j])[i - 1]);
+ }
+ }
+
+ new
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
+
+ Assert.assertTrue(checkSorted(tablet));
+
+ Assert.assertEquals(indices.size(), tablet.rowSize);
+
+ timestamps = Arrays.copyOfRange(tablet.timestamps, 0, tablet.rowSize);
+ for (int i = 0; i < 3; ++i) {
+ Assert.assertArrayEquals(
+ timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0,
tablet.rowSize));
+ }
+
+ for (int i = 1; i < tablet.rowSize; ++i) {
+ Assert.assertTrue(timestamps[i] > timestamps[i - 1]);
+ for (int j = 0; j < 3; ++j) {
+ Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[])
tablet.values[j])[i - 1]);
+ }
+ }
+ }
+}