This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 5cb31e84dd40c756a269a3c75f022e30c029e077 Author: Zhenyu Luo <[email protected]> AuthorDate: Fri Aug 8 09:27:19 2025 +0800 Pipe: Repair the table model construction TabletBatch process causing memory expansion (#16123) * Pipe: Repair the table model construction TabletBatch process causing memory expansion * add ut * update PipeTabletEventPlainBatch * update PipeTabletEventPlainBatch (cherry picked from commit 256f3e726520beaeb4d5cac527873930c9d77cb9) --- .../evolvable/batch/PipeTabletEventPlainBatch.java | 108 ++++++++++++++- .../pipe/sink/PipeTabletEventPlainBatchTest.java | 147 +++++++++++++++++++++ .../db/pipe/sink/PipeTabletEventSorterTest.java | 4 +- 3 files changed, 251 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java index a68c5ae9602..0db905a6dc0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java @@ -28,16 +28,22 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.UnSupportedDataTypeException; import org.apache.tsfile.write.record.Tablet; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.time.LocalDate; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -102,23 +108,28 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { final String databaseName = insertTablets.getKey(); for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry : insertTablets.getValue().entrySet()) { - final List<Tablet> batchTablets = new ArrayList<>(); + // needCopyFlag and tablet + final List<Pair<Boolean, Tablet>> batchTablets = new ArrayList<>(); for (final Tablet tablet : tabletEntry.getValue().getRight()) { boolean success = false; - for (final Tablet batchTablet : batchTablets) { - if (batchTablet.append(tablet, tabletEntry.getValue().getLeft())) { + for (final Pair<Boolean, Tablet> tabletPair : batchTablets) { + if (tabletPair.getLeft()) { + tabletPair.setRight(copyTablet(tabletPair.getRight())); + tabletPair.setLeft(Boolean.FALSE); + } + if (tabletPair.getRight().append(tablet, tabletEntry.getValue().getLeft())) { success = true; break; } } if (!success) { - batchTablets.add(tablet); + batchTablets.add(new Pair<>(Boolean.TRUE, tablet)); } } - for (final Tablet batchTablet : batchTablets) { + for (final Pair<Boolean, Tablet> tabletPair : batchTablets) { try (final PublicBAOS byteArrayOutputStream = new PublicBAOS(); final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) { - batchTablet.serialize(outputStream); + tabletPair.getRight().serialize(outputStream); ReadWriteIOUtils.write(true, outputStream); tabletBuffers.add( ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size())); @@ -214,4 +225,89 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch { currentBatch.getRight().add(tablet); return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4; } + + public static Tablet copyTablet(final Tablet tablet) { + final Object[] copiedValues = new Object[tablet.getValues().length]; + for (int i = 0; i < tablet.getValues().length; i++) { + if (tablet.getValues()[i] == null + || tablet.getSchemas() == null + || tablet.getSchemas().get(i) == null) { + continue; + } + copiedValues[i] = + copyValueList( + tablet.getValues()[i], tablet.getSchemas().get(i).getType(), tablet.getRowSize()); + } + + BitMap[] bitMaps = null; + if (tablet.getBitMaps() != null) { + bitMaps = + Arrays.stream(tablet.getBitMaps()) + .map( + bitMap -> { + if (bitMap != null) { + final byte[] data = bitMap.getByteArray(); + return new BitMap(bitMap.getSize(), Arrays.copyOf(data, data.length)); + } + return null; + }) + .toArray(BitMap[]::new); + } + + return new Tablet( + tablet.getTableName(), + new ArrayList<>(tablet.getSchemas()), + new ArrayList<>(tablet.getColumnTypes()), + Arrays.copyOf(tablet.getTimestamps(), tablet.getRowSize()), + copiedValues, + bitMaps, + tablet.getRowSize()); + } + + private static Object copyValueList( + final Object valueList, final TSDataType dataType, final int rowSize) { + switch (dataType) { + case BOOLEAN: + final boolean[] boolValues = (boolean[]) valueList; + final boolean[] copiedBoolValues = new boolean[rowSize]; + System.arraycopy(boolValues, 0, copiedBoolValues, 0, rowSize); + return copiedBoolValues; + case INT32: + final int[] intValues = (int[]) valueList; + final int[] copiedIntValues = new int[rowSize]; + System.arraycopy(intValues, 0, copiedIntValues, 0, rowSize); + return copiedIntValues; + case DATE: + final LocalDate[] dateValues = (LocalDate[]) valueList; + final LocalDate[] copiedDateValues = new LocalDate[rowSize]; + System.arraycopy(dateValues, 0, copiedDateValues, 0, rowSize); + return copiedDateValues; + case INT64: + case TIMESTAMP: + final long[] longValues = (long[]) valueList; + final long[] copiedLongValues = new long[rowSize]; + System.arraycopy(longValues, 0, copiedLongValues, 0, rowSize); + return copiedLongValues; + case FLOAT: + final float[] floatValues = (float[]) valueList; + final float[] copiedFloatValues = new float[rowSize]; + System.arraycopy(floatValues, 0, copiedFloatValues, 0, rowSize); + return copiedFloatValues; + case DOUBLE: + final double[] doubleValues = (double[]) valueList; + final double[] copiedDoubleValues = new double[rowSize]; + System.arraycopy(doubleValues, 0, copiedDoubleValues, 0, rowSize); + return copiedDoubleValues; + case TEXT: + case BLOB: + case STRING: + final Binary[] binaryValues = (Binary[]) valueList; + final Binary[] copiedBinaryValues = new Binary[rowSize]; + System.arraycopy(binaryValues, 0, copiedBinaryValues, 0, rowSize); + return copiedBinaryValues; + default: + throw new UnSupportedDataTypeException( + String.format("Data type %s is not supported.", dataType)); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventPlainBatchTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventPlainBatchTest.java new file mode 100644 index 00000000000..1815c205b57 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventPlainBatchTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink; + +import org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch; + +import org.apache.tsfile.write.record.Tablet; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Random; + +public class PipeTabletEventPlainBatchTest { + + @Test + public void constructTabletBatch() { + Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true); + + Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet); + + Assert.assertNotSame(tablet, copyTablet); + Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize()); + + for (int i = 0; i < tablet.getSchemas().size(); i++) { + for (int j = 0; j < tablet.getRowSize(); j++) { + Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i)); + } + } + } + + @Test + public void constructTabletBatch1() { + Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true); + tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, true)); + Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet); + + Assert.assertNotSame(tablet, copyTablet); + Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize()); + + for (int i = 0; i < tablet.getSchemas().size(); i++) { + for (int j = 0; j < tablet.getRowSize(); j++) { + Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i)); + } + } + } + + @Test + public void constructTabletBatch2() { + Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true); + tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, true)); + + tablet = PipeTabletEventPlainBatch.copyTablet(tablet); + tablet.getBitMaps()[1].markAll(); + tablet.getValues()[1] = null; + + Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet); + + Assert.assertNotSame(tablet, copyTablet); + Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize()); + + for (int i = 0; i < tablet.getSchemas().size(); i++) { + if (i == 1) { + Assert.assertTrue(tablet.getBitMaps()[i].isAllMarked()); + Assert.assertNull(copyTablet.getValues()[1]); + continue; + } + + for (int j = 0; j < tablet.getRowSize(); j++) { + Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i)); + } + } + } + + @Test + public void constructTabletBatch3() { + Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true); + tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, true)); + + tablet = PipeTabletEventPlainBatch.copyTablet(tablet); + tablet.getBitMaps()[1] = null; + + Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet); + + Assert.assertNotSame(tablet, copyTablet); + Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize()); + + for (int i = 0; i < tablet.getSchemas().size(); i++) { + if (i == 1) { + Assert.assertNull(tablet.getBitMaps()[i]); + continue; + } + + for (int j = 0; j < tablet.getRowSize(); j++) { + Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i)); + } + } + } + + @Test + public void constructTabletBatch4() { + Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true, true); + tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true, true)); + + List<Integer> rowIndices = new ArrayList<>(tablet.getSchemas().size()); + Random random = new Random(); + for (int i = 0; i < tablet.getSchemas().size(); i++) { + int r = random.nextInt(tablet.getRowSize()); + rowIndices.add(r); + tablet.addValue(tablet.getSchemas().get(i).getMeasurementName(), r, null); + } + + Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet); + + Assert.assertNotSame(tablet, copyTablet); + Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize()); + + for (int i = 0; i < tablet.getSchemas().size(); i++) { + for (int j = 0; j < tablet.getRowSize(); j++) { + if (rowIndices.get(i) == j) { + Assert.assertTrue(tablet.getBitMaps()[i].isMarked(j)); + Assert.assertNull(tablet.getValue(j, i)); + continue; + } + Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i)); + } + } + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java index 4e9a9a95005..eb5ffd475d3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java @@ -277,7 +277,7 @@ public class PipeTabletEventSorterTest { } } - private Tablet generateTablet( + static Tablet generateTablet( final String tableName, final int deviceIDNum, final boolean hasDuplicates, @@ -389,7 +389,7 @@ public class PipeTabletEventSorterTest { return tablet; } - public LocalDate getDate(final int value) { + public static LocalDate getDate(final int value) { Date date = new Date(value); SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd"); try {
