This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a84bee756a5 Pipe: use mem table to batch write tree data into tsfile
(#15276)
a84bee756a5 is described below
commit a84bee756a580fd1d0a9b4264da193bae6fcacc8
Author: VGalaxies <[email protected]>
AuthorDate: Thu Apr 17 12:21:36 2025 +0800
Pipe: use mem table to batch write tree data into tsfile (#15276)
---
.../pattern/IoTDBTSPatternPullConsumeTsfileIT.java | 2 -
.../IoTDBTSPatternTsfilePushConsumerIT.java | 2 +-
.../batch/PipeTabletEventTsFileBatch.java | 9 +-
.../util/builder/PipeTableModeTsFileBuilder.java | 3 +-
.../util/builder/PipeTreeModelTsFileBuilder.java | 3 +-
.../util/builder/PipeTreeModelTsFileBuilderV2.java | 196 +++++++++++++++++++++
.../connector/util/builder/PipeTsFileBuilder.java | 24 ++-
.../iotdb/db/service/metrics/WritingMetrics.java | 4 +
.../db/storageengine/dataregion/DataRegion.java | 3 +
9 files changed, 225 insertions(+), 21 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
index b41283afcb2..04aa97a0d18 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
@@ -158,7 +158,6 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends
AbstractSubscriptionRegre
assertEquals(results.get(0), 10);
assertEquals(results.get(1), 0);
assertEquals(results.get(2), 0);
- assertEquals(results.get(3), 1, "number of received files");
insert_data(System.currentTimeMillis());
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -177,6 +176,5 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends
AbstractSubscriptionRegre
"Unsubscribe and resubscribe, progress is not retained. Full
synchronization.");
assertEquals(results.get(1), 0, "Subscribe again after unsubscribe," +
database + ".d_1");
assertEquals(results.get(2), 0, "Unsubscribe and then subscribe again," +
database2 + ".d_2");
- assertEquals(results.get(3), 1, "Number of received files: resubscribe
after unsubscription");
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
index 4b9bda04ead..a59dcb36242 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
@@ -202,7 +202,7 @@ public class IoTDBTSPatternTsfilePushConsumerIT extends
AbstractSubscriptionRegr
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceiveCount.get(), 1, "receive files 1");
+ assertGte(onReceiveCount.get(), 1, "receive files over 1");
assertEquals(rowCounts.get(0).get(), 10, device + ".s_0");
assertEquals(rowCounts.get(1).get(), 0, device + ".s_1");
assertEquals(rowCounts.get(2).get(), 0, database + ".d_1.s_0");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index ae719dce859..86451f69ecf 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -20,7 +20,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
import
org.apache.iotdb.db.pipe.connector.util.builder.PipeTableModeTsFileBuilder;
-import
org.apache.iotdb.db.pipe.connector.util.builder.PipeTreeModelTsFileBuilder;
+import
org.apache.iotdb.db.pipe.connector.util.builder.PipeTreeModelTsFileBuilderV2;
import org.apache.iotdb.db.pipe.connector.util.builder.PipeTsFileBuilder;
import
org.apache.iotdb.db.pipe.connector.util.sorter.PipeTableModelTabletEventSorter;
import
org.apache.iotdb.db.pipe.connector.util.sorter.PipeTreeModelTabletEventSorter;
@@ -61,7 +61,7 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
super(maxDelayInMs, requestMaxBatchSizeInBytes);
final AtomicLong tsFileIdGenerator = new AtomicLong(0);
- treeModeTsFileBuilder = new PipeTreeModelTsFileBuilder(currentBatchId,
tsFileIdGenerator);
+ treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId,
tsFileIdGenerator);
tableModeTsFileBuilder = new PipeTableModeTsFileBuilder(currentBatchId,
tsFileIdGenerator);
}
@@ -132,7 +132,10 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
final boolean isAligned) {
new
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
- totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+ // TODO: Currently, PipeTreeModelTsFileBuilderV2 still uses
PipeTreeModelTsFileBuilder as a
+ // fallback builder, so memory table writing and storing temporary tablets
require double the
+ // memory.
+ totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet)
* 2;
pipeName2WeightMap.compute(
new Pair<>(pipeName, creationTime),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
index 1cdb0236f57..fcb735c55fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
@@ -27,6 +27,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.WriteUtils;
+import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
@@ -142,7 +143,7 @@ public class PipeTableModeTsFileBuilder extends
PipeTsFileBuilder {
// Try making the tsfile size as large as possible
while (!linkedHashSet.isEmpty()) {
if (Objects.isNull(fileWriter)) {
- createFileWriter();
+ fileWriter = new TsFileWriter(createFile());
}
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
index 5f14c89118b..237e294df69 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
@@ -25,6 +25,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.slf4j.Logger;
@@ -143,7 +144,7 @@ public class PipeTreeModelTsFileBuilder extends
PipeTsFileBuilder {
// Try making the tsfile size as large as possible
while (!device2TabletsLinkedList.isEmpty()) {
if (Objects.isNull(fileWriter)) {
- createFileWriter();
+ fileWriter = new TsFileWriter(createFile());
}
try {
tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList,
device2Aligned);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
new file mode 100644
index 00000000000..d2d87d90f0f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.util.builder;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeTreeModelTsFileBuilderV2 extends PipeTsFileBuilder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTreeModelTsFileBuilderV2.class);
+
+ private static final PlanNodeId PLACEHOLDER_PLAN_NODE_ID =
+ new PlanNodeId("PipeTreeModelTsFileBuilderV2");
+
+ private final List<Tablet> tabletList = new ArrayList<>();
+ private final List<Boolean> isTabletAlignedList = new ArrayList<>();
+
+ // TODO: remove me later if stable
+ private final PipeTreeModelTsFileBuilder fallbackBuilder;
+
+ public PipeTreeModelTsFileBuilderV2(
+ final AtomicLong currentBatchId, final AtomicLong tsFileIdGenerator) {
+ super(currentBatchId, tsFileIdGenerator);
+ fallbackBuilder = new PipeTreeModelTsFileBuilder(currentBatchId,
tsFileIdGenerator);
+ }
+
+ @Override
+ public void bufferTableModelTablet(final String dataBase, final Tablet
tablet) {
+ throw new UnsupportedOperationException(
+ "PipeTreeModelTsFileBuilderV2 does not support table model tablet to
build TSFile");
+ }
+
+ @Override
+ public void bufferTreeModelTablet(final Tablet tablet, final Boolean
isAligned) {
+ tabletList.add(tablet);
+ isTabletAlignedList.add(isAligned);
+ fallbackBuilder.bufferTreeModelTablet(tablet, isAligned);
+ }
+
+ @Override
+ public List<Pair<String, File>> convertTabletToTsFileWithDBInfo()
+ throws IOException, WriteProcessException {
+ try {
+ return writeTabletsToTsFiles();
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Exception occurred when PipeTreeModelTsFileBuilderV2 writing
tablets to tsfile, use fallback tsfile builder: {}",
+ e.getMessage(),
+ e);
+ return fallbackBuilder.convertTabletToTsFileWithDBInfo();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return tabletList.isEmpty();
+ }
+
+ @Override
+ public void onSuccess() {
+ super.onSuccess();
+ tabletList.clear();
+ isTabletAlignedList.clear();
+ fallbackBuilder.onSuccess();
+ }
+
+ @Override
+ public synchronized void close() {
+ super.close();
+ tabletList.clear();
+ isTabletAlignedList.clear();
+ fallbackBuilder.close();
+ }
+
+ private List<Pair<String, File>> writeTabletsToTsFiles() throws
WriteProcessException {
+ final IMemTable memTable = new PrimitiveMemTable(null, null);
+ final List<Pair<String, File>> sealedFiles = new ArrayList<>();
+ try (final RestorableTsFileIOWriter writer = new
RestorableTsFileIOWriter(createFile())) {
+ writeTabletsIntoOneFile(memTable, writer);
+ sealedFiles.add(new Pair<>(null, writer.getFile()));
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Batch id = {}: Failed to write tablets into tsfile, because {}",
+ currentBatchId.get(),
+ e.getMessage(),
+ e);
+ // TODO: handle ex
+ throw new WriteProcessException(e);
+ } finally {
+ memTable.release();
+ }
+
+ return sealedFiles;
+ }
+
+ private void writeTabletsIntoOneFile(
+ final IMemTable memTable, final RestorableTsFileIOWriter writer) throws
Exception {
+ for (int i = 0, size = tabletList.size(); i < size; ++i) {
+ final Tablet tablet = tabletList.get(i);
+
+ // convert date value to int
+ // refer to
+ //
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
+ final Object[] values = tablet.getValues();
+ for (int j = 0; j < tablet.getSchemas().size(); ++j) {
+ final IMeasurementSchema schema = tablet.getSchemas().get(j);
+ if (Objects.nonNull(schema) && Objects.equals(TSDataType.DATE,
schema.getType())) {
+ final LocalDate[] dates = ((LocalDate[]) values[j]);
+ final int[] dateValues = new int[dates.length];
+ for (int k = 0; k < Math.min(dates.length, tablet.getRowSize());
k++) {
+ dateValues[k] = DateUtils.parseDateExpressionToInt(dates[k]);
+ }
+ values[j] = dateValues;
+ }
+ }
+
+ final InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ PLACEHOLDER_PLAN_NODE_ID,
+ new PartialPath(tablet.getDeviceId()),
+ isTabletAlignedList.get(i),
+ tablet.getSchemas().stream()
+ .map(IMeasurementSchema::getMeasurementName)
+ .toArray(String[]::new),
+ tablet.getSchemas().stream()
+ .map(IMeasurementSchema::getType)
+ .toArray(TSDataType[]::new),
+ // TODO: cast
+ tablet.getSchemas().stream()
+ .map(schema -> (MeasurementSchema) schema)
+ .toArray(MeasurementSchema[]::new),
+ tablet.getTimestamps(),
+ tablet.getBitMaps(),
+ tablet.getValues(),
+ tablet.getRowSize());
+
+ final int start = 0;
+ final int end = insertTabletNode.getRowCount();
+
+ try {
+ if (insertTabletNode.isAligned()) {
+ memTable.insertAlignedTablet(insertTabletNode, start, end, null);
+ } else {
+ memTable.insertTablet(insertTabletNode, start, end);
+ }
+ } catch (final org.apache.iotdb.db.exception.WriteProcessException e) {
+ throw new WriteProcessException(e);
+ }
+ }
+
+ final MemTableFlushTask memTableFlushTask = new
MemTableFlushTask(memTable, writer, null, null);
+ memTableFlushTask.syncFlushMemTable();
+
+ writer.endFile();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java
index d320bff0d17..8b97cafe4c8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java
@@ -145,18 +145,16 @@ public abstract class PipeTsFileBuilder {
}
}
- protected void createFileWriter() throws IOException {
- fileWriter =
- new TsFileWriter(
- new File(
- batchFileBaseDir,
- TS_FILE_PREFIX
- + "_"
- + IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
- + "_"
- + currentBatchId.get()
- + "_"
- + tsFileIdGenerator.getAndIncrement()
- + TsFileConstant.TSFILE_SUFFIX));
+ protected File createFile() throws IOException {
+ return new File(
+ batchFileBaseDir,
+ TS_FILE_PREFIX
+ + "_"
+ + IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+ + "_"
+ + currentBatchId.get()
+ + "_"
+ + tsFileIdGenerator.getAndIncrement()
+ + TsFileConstant.TSFILE_SUFFIX);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 2e201e0c211..8d80547ab0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.metrics.utils.MetricType;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
public class WritingMetrics implements IMetricSet {
private static final WritingMetrics INSTANCE = new WritingMetrics();
@@ -813,6 +814,9 @@ public class WritingMetrics implements IMetricSet {
}
private DataRegionId getDataRegionIdFromStorageGroupStr(String storageGroup)
{
+ if (Objects.isNull(storageGroup)) {
+ return null;
+ }
int idx = storageGroup.lastIndexOf('-');
if (idx == -1) {
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6266a4d3129..44dc2cfbb6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2948,6 +2948,9 @@ public class DataRegion implements IDataRegionForQuery {
}
public static Optional<String> getNonSystemDatabaseName(String databaseName)
{
+ if (Objects.isNull(databaseName)) {
+ return Optional.empty();
+ }
if (databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
return Optional.empty();
}