This is an automated email from the ASF dual-hosted git repository. rong pushed a commit to branch tsfile-batch-ordering in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 4aaa30745001fa633b7034ed78974365bb877d34 Author: Steve Yurong Su <[email protected]> AuthorDate: Wed Jun 26 11:36:44 2024 +0800 Pipe: Avoid writing out-of-order data in tsfile when `'sink.format' = 'file'` --- .../evolvable/batch/PipeTabletEventSorter.java | 196 +++++++++++++++++++ .../batch/PipeTabletEventTsFileBatch.java | 162 +++++++++++----- .../pipe/connector/PipeTabletEventSorterTest.java | 215 +++++++++++++++++++++ 3 files changed, 521 insertions(+), 52 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventSorter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventSorter.java new file mode 100644 index 00000000000..ca24972d798 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventSorter.java @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.UnSupportedDataTypeException; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; + +import java.time.LocalDate; +import java.util.Arrays; +import java.util.Comparator; + +public class PipeTabletEventSorter { + + private final Tablet tablet; + + private boolean isSorted = true; + private boolean isDeduplicated = true; + + private Integer[] index; + private int deduplicatedSize; + + public PipeTabletEventSorter(final Tablet tablet) { + this.tablet = tablet; + deduplicatedSize = tablet == null ? 0 : tablet.rowSize; + } + + public void deduplicateAndSortTimestampsIfNecessary() { + if (tablet == null || tablet.rowSize == 0) { + return; + } + + for (int i = 1, size = tablet.rowSize; i < size; ++i) { + final long currentTimestamp = tablet.timestamps[i]; + final long previousTimestamp = tablet.timestamps[i - 1]; + + if (currentTimestamp < previousTimestamp) { + isSorted = false; + } + if (currentTimestamp == previousTimestamp) { + isDeduplicated = false; + } + + if (!isSorted && !isDeduplicated) { + break; + } + } + + if (isSorted && isDeduplicated) { + return; + } + + index = new Integer[tablet.rowSize]; + for (int i = 0, size = tablet.rowSize; i < size; i++) { + index[i] = i; + } + + if (!isSorted) { + sortTimestamps(); + } + if (!isDeduplicated) { + deduplicateTimestamps(); + } + sortAndDeduplicateValuesAndBitMaps(); + } + + private void sortTimestamps() { + Arrays.sort(index, Comparator.comparingLong(i -> tablet.timestamps[i])); + Arrays.sort(tablet.timestamps, 0, tablet.rowSize); + } + + private void deduplicateTimestamps() { + deduplicatedSize = 1; + for (int i = 1, size = tablet.rowSize; i < size; i++) { + if (tablet.timestamps[i] != tablet.timestamps[i - 1]) { + index[deduplicatedSize] = index[i]; + tablet.timestamps[deduplicatedSize] = tablet.timestamps[i]; + + ++deduplicatedSize; + } + } + tablet.rowSize = deduplicatedSize; + } + + private void sortAndDeduplicateValuesAndBitMaps() { + int columnIndex = 0; + for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) { + final IMeasurementSchema schema = tablet.getSchemas().get(i); + if (schema != null) { + tablet.values[columnIndex] = + deduplicateValueList( + deduplicatedSize, tablet.values[columnIndex], schema.getType(), index); + if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) { + tablet.bitMaps[columnIndex] = + deduplicateBitMap(deduplicatedSize, tablet.bitMaps[columnIndex], index); + } + columnIndex++; + } + } + } + + private static Object deduplicateValueList( + int deduplicatedSize, + final Object valueList, + final TSDataType dataType, + final Integer[] index) { + switch (dataType) { + case BOOLEAN: + final boolean[] boolValues = (boolean[]) valueList; + final boolean[] deduplicatedBoolValues = new boolean[boolValues.length]; + for (int i = 0; i < deduplicatedSize; i++) { + deduplicatedBoolValues[i] = boolValues[index[i]]; + } + return deduplicatedBoolValues; + case INT32: + final int[] intValues = (int[]) valueList; + final int[] deduplicatedIntValues = new int[intValues.length]; + for (int i = 0; i < deduplicatedSize; i++) { + deduplicatedIntValues[i] = intValues[index[i]]; + } + return deduplicatedIntValues; + case DATE: + final LocalDate[] dateValues = (LocalDate[]) valueList; + final LocalDate[] deduplicatedDateValues = new LocalDate[dateValues.length]; + for (int i = 0; i < deduplicatedSize; i++) { + deduplicatedDateValues[i] = dateValues[index[i]]; + } + return deduplicatedDateValues; + case INT64: + case TIMESTAMP: + final long[] longValues = (long[]) valueList; + final long[] deduplicatedLongValues = new long[longValues.length]; + for (int i = 0; i < deduplicatedSize; i++) { + deduplicatedLongValues[i] = longValues[index[i]]; + } + return deduplicatedLongValues; + case FLOAT: + final float[] floatValues = (float[]) valueList; + final float[] deduplicatedFloatValues = new float[floatValues.length]; + for (int i = 0; i < deduplicatedSize; i++) { + deduplicatedFloatValues[i] = floatValues[index[i]]; + } + return deduplicatedFloatValues; + case DOUBLE: + final double[] doubleValues = (double[]) valueList; + final double[] deduplicatedDoubleValues = new double[doubleValues.length]; + for (int i = 0; i < deduplicatedSize; i++) { + deduplicatedDoubleValues[i] = doubleValues[index[i]]; + } + return deduplicatedDoubleValues; + case TEXT: + case BLOB: + case STRING: + final Binary[] binaryValues = (Binary[]) valueList; + final Binary[] deduplicatedBinaryValues = new Binary[binaryValues.length]; + for (int i = 0; i < deduplicatedSize; i++) { + deduplicatedBinaryValues[i] = binaryValues[index[i]]; + } + return deduplicatedBinaryValues; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataType)); + } + } + + private static BitMap deduplicateBitMap( + int deduplicatedSize, final BitMap bitMap, final Integer[] index) { + final BitMap deduplicatedBitMap = new BitMap(bitMap.getSize()); + for (int i = 0; i < deduplicatedSize; i++) { + if (bitMap.isMarked(index[i])) { + deduplicatedBitMap.mark(i); + } + } + return deduplicatedBitMap; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index 6abca7b3183..0015db2e192 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -48,6 +48,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -165,7 +166,12 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { } private void bufferTablet( - final String pipeName, long creationTime, Tablet tablet, boolean isAligned) { + final String pipeName, + final long creationTime, + final Tablet tablet, + final boolean isAligned) { + new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet); pipeName2WeightMap.compute( @@ -208,12 +214,19 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { Comparator.comparingLong(tablet -> tablet.timestamps[0])); } + // Sort the devices by device id + List<String> devices = new ArrayList<>(device2Tablets.keySet()); + devices.sort(Comparator.naturalOrder()); + // Replace ArrayList with LinkedList to improve performance - final Map<String, LinkedList<Tablet>> device2TabletsLinkedList = new HashMap<>(); - for (final Map.Entry<String, List<Tablet>> entry : device2Tablets.entrySet()) { - device2TabletsLinkedList.put(entry.getKey(), new LinkedList<>(entry.getValue())); + final LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList = + new LinkedHashMap<>(); + for (final String device : devices) { + device2TabletsLinkedList.put(device, new LinkedList<>(device2Tablets.get(device))); } - // Clear the original device2Tablets to release memory + + // Help GC + devices.clear(); device2Tablets.clear(); // Write the tablets to the tsfile device by device, and the tablets @@ -241,56 +254,44 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { + TsFileConstant.TSFILE_SUFFIX)); } - final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator = - device2TabletsLinkedList.entrySet().iterator(); - - while (iterator.hasNext()) { - final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next(); - final String deviceId = entry.getKey(); - final LinkedList<Tablet> tablets = entry.getValue(); - - final List<Tablet> tabletsToWrite = new ArrayList<>(); - - Tablet lastTablet = null; - while (!tablets.isEmpty()) { - final Tablet tablet = tablets.peekFirst(); - if (Objects.isNull(lastTablet) - // lastTablet.rowSize is not 0 - || lastTablet.timestamps[lastTablet.rowSize - 1] < tablet.timestamps[0]) { - tabletsToWrite.add(tablet); - lastTablet = tablet; - tablets.pollFirst(); - } else { - break; - } - } + try { + tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList, device2Aligned); + } catch (final Exception e) { + LOGGER.warn( + "Batch id = {}: Failed to write tablets into tsfile, because {}", + currentBatchId.get(), + e.getMessage(), + e); - if (tablets.isEmpty()) { - iterator.remove(); + try { + fileWriter.close(); + + final File sealedFile = fileWriter.getIOWriter().getFile(); + sealedFiles.add(sealedFile); + LOGGER.info( + "Batch id = {}: Seal tsfile {} successfully after failed to write tablets into it. ", + currentBatchId.get(), + sealedFile.getPath()); + + fileWriter = null; + } catch (final Exception sealException) { + LOGGER.warn( + "Batch id = {}: Failed to seal tsfile after failed to write tablets into it. ", + currentBatchId.get(), + sealException); + + final boolean deleteSuccess = FileUtils.deleteQuietly(fileWriter.getIOWriter().getFile()); + LOGGER.warn( + "Batch id = {}: Delete the tsfile {} after failed to seal it {}. " + + "Maybe the tsfile needs to be deleted manually.", + currentBatchId.get(), + fileWriter.getIOWriter().getFile().getPath(), + deleteSuccess ? "successfully" : "unsuccessfully"); + + fileWriter = null; } - final boolean isAligned = device2Aligned.get(deviceId); - for (final Tablet tablet : tabletsToWrite) { - if (isAligned) { - try { - fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId), tablet.getSchemas()); - } catch (final WriteProcessException ignore) { - // Do nothing if the timeSeries has been registered - } - - fileWriter.writeAligned(tablet); - } else { - for (final MeasurementSchema schema : tablet.getSchemas()) { - try { - fileWriter.registerTimeseries(new Path(tablet.deviceId), schema); - } catch (final WriteProcessException ignore) { - // Do nothing if the timeSeries has been registered - } - } - - fileWriter.write(tablet); - } - } + throw e; } fileWriter.close(); @@ -308,6 +309,63 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { return sealedFiles; } + private void tryBestToWriteTabletsIntoOneFile( + LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList, + Map<String, Boolean> device2Aligned) + throws IOException, WriteProcessException { + final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator = + device2TabletsLinkedList.entrySet().iterator(); + + while (iterator.hasNext()) { + final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next(); + final String deviceId = entry.getKey(); + final LinkedList<Tablet> tablets = entry.getValue(); + + final List<Tablet> tabletsToWrite = new ArrayList<>(); + + Tablet lastTablet = null; + while (!tablets.isEmpty()) { + final Tablet tablet = tablets.peekFirst(); + if (Objects.isNull(lastTablet) + // lastTablet.rowSize is not 0 + || lastTablet.timestamps[lastTablet.rowSize - 1] < tablet.timestamps[0]) { + tabletsToWrite.add(tablet); + lastTablet = tablet; + tablets.pollFirst(); + } else { + break; + } + } + + if (tablets.isEmpty()) { + iterator.remove(); + } + + final boolean isAligned = device2Aligned.get(deviceId); + for (final Tablet tablet : tabletsToWrite) { + if (isAligned) { + try { + fileWriter.registerAlignedTimeseries(new Path(tablet.deviceId), tablet.getSchemas()); + } catch (final WriteProcessException ignore) { + // Do nothing if the timeSeries has been registered + } + + fileWriter.writeAligned(tablet); + } else { + for (final MeasurementSchema schema : tablet.getSchemas()) { + try { + fileWriter.registerTimeseries(new Path(tablet.deviceId), schema); + } catch (final WriteProcessException ignore) { + // Do nothing if the timeSeries has been registered + } + } + + fileWriter.write(tablet); + } + } + } + } + @Override protected long getMaxBatchSizeInBytes() { return maxSizeInBytes; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java new file mode 100644 index 00000000000..728eb9f1c63 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/connector/PipeTabletEventSorterTest.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.connector; + +import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventSorter; +import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class PipeTabletEventSorterTest { + + public static void main(String[] args) { + new PipeTabletEventSorterTest().testDeduplicateAndSort(); + new PipeTabletEventSorterTest().testDeduplicate(); + new PipeTabletEventSorterTest().testSort(); + } + + @Test + public void testDeduplicateAndSort() { + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); + + Tablet tablet = new Tablet("root.sg.device", schemaList, 30); + + long timestamp = 300; + for (long i = 0; i < 10; i++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp + i); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, timestamp + i); + } + + rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp - i); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, timestamp - i); + } + + rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, timestamp); + } + } + + Set<Integer> indices = new HashSet<>(); + for (int i = 0; i < 30; i++) { + indices.add((int) tablet.timestamps[i]); + } + + Assert.assertFalse(PipeTransferTabletRawReq.checkSorted(tablet)); + + new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + + Assert.assertTrue(PipeTransferTabletRawReq.checkSorted(tablet)); + + Assert.assertEquals(indices.size(), tablet.rowSize); + + final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, tablet.rowSize); + for (int i = 0; i < 3; ++i) { + Assert.assertArrayEquals( + timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, tablet.rowSize)); + } + + for (int i = 1; i < tablet.rowSize; ++i) { + Assert.assertTrue(timestamps[i] > timestamps[i - 1]); + for (int j = 0; j < 3; ++j) { + Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) tablet.values[j])[i - 1]); + } + } + } + + @Test + public void testDeduplicate() { + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); + + Tablet tablet = new Tablet("root.sg.device", schemaList, 10); + + long timestamp = 300; + for (long i = 0; i < 10; i++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, timestamp); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, timestamp); + } + } + + Set<Integer> indices = new HashSet<>(); + for (int i = 0; i < 10; i++) { + indices.add((int) tablet.timestamps[i]); + } + + Assert.assertTrue(PipeTransferTabletRawReq.checkSorted(tablet)); + + new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + + Assert.assertTrue(PipeTransferTabletRawReq.checkSorted(tablet)); + + Assert.assertEquals(indices.size(), tablet.rowSize); + + final long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, tablet.rowSize); + for (int i = 0; i < 3; ++i) { + Assert.assertArrayEquals( + timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, tablet.rowSize)); + } + + for (int i = 1; i < tablet.rowSize; ++i) { + Assert.assertTrue(timestamps[i] > timestamps[i - 1]); + for (int j = 0; j < 3; ++j) { + Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) tablet.values[j])[i - 1]); + } + } + } + + @Test + public void testSort() { + List<MeasurementSchema> schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT64)); + schemaList.add(new MeasurementSchema("s3", TSDataType.INT64)); + + Tablet tablet = new Tablet("root.sg.device", schemaList, 30); + + for (long i = 0; i < 10; i++) { + int rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, (long) rowIndex + 2); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long) rowIndex + 2); + } + + rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, (long) rowIndex); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long) rowIndex); + } + + rowIndex = tablet.rowSize++; + tablet.addTimestamp(rowIndex, (long) rowIndex - 2); + for (int s = 0; s < 3; s++) { + tablet.addValue(schemaList.get(s).getMeasurementId(), rowIndex, (long) rowIndex - 2); + } + } + + Set<Integer> indices = new HashSet<>(); + for (int i = 0; i < 30; i++) { + indices.add((int) tablet.timestamps[i]); + } + + Assert.assertFalse(PipeTransferTabletRawReq.checkSorted(tablet)); + + long[] timestamps = Arrays.copyOfRange(tablet.timestamps, 0, tablet.rowSize); + for (int i = 0; i < 3; ++i) { + Assert.assertArrayEquals( + timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, tablet.rowSize)); + } + + for (int i = 1; i < tablet.rowSize; ++i) { + Assert.assertTrue(timestamps[i] != timestamps[i - 1]); + for (int j = 0; j < 3; ++j) { + Assert.assertTrue(((long[]) tablet.values[j])[i] != ((long[]) tablet.values[j])[i - 1]); + } + } + + new PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary(); + + Assert.assertTrue(PipeTransferTabletRawReq.checkSorted(tablet)); + + Assert.assertEquals(indices.size(), tablet.rowSize); + + timestamps = Arrays.copyOfRange(tablet.timestamps, 0, tablet.rowSize); + for (int i = 0; i < 3; ++i) { + Assert.assertArrayEquals( + timestamps, Arrays.copyOfRange((long[]) tablet.values[0], 0, tablet.rowSize)); + } + + for (int i = 1; i < tablet.rowSize; ++i) { + Assert.assertTrue(timestamps[i] > timestamps[i - 1]); + for (int j = 0; j < 3; ++j) { + Assert.assertTrue(((long[]) tablet.values[j])[i] > ((long[]) tablet.values[j])[i - 1]); + } + } + } +}
