This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new a84bee756a5 Pipe: use mem table to batch write tree data into tsfile 
(#15276)
a84bee756a5 is described below

commit a84bee756a580fd1d0a9b4264da193bae6fcacc8
Author: VGalaxies <[email protected]>
AuthorDate: Thu Apr 17 12:21:36 2025 +0800

    Pipe: use mem table to batch write tree data into tsfile (#15276)
---
 .../pattern/IoTDBTSPatternPullConsumeTsfileIT.java |   2 -
 .../IoTDBTSPatternTsfilePushConsumerIT.java        |   2 +-
 .../batch/PipeTabletEventTsFileBatch.java          |   9 +-
 .../util/builder/PipeTableModeTsFileBuilder.java   |   3 +-
 .../util/builder/PipeTreeModelTsFileBuilder.java   |   3 +-
 .../util/builder/PipeTreeModelTsFileBuilderV2.java | 196 +++++++++++++++++++++
 .../connector/util/builder/PipeTsFileBuilder.java  |  24 ++-
 .../iotdb/db/service/metrics/WritingMetrics.java   |   4 +
 .../db/storageengine/dataregion/DataRegion.java    |   3 +
 9 files changed, 225 insertions(+), 21 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
index b41283afcb2..04aa97a0d18 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
@@ -158,7 +158,6 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends 
AbstractSubscriptionRegre
     assertEquals(results.get(0), 10);
     assertEquals(results.get(1), 0);
     assertEquals(results.get(2), 0);
-    assertEquals(results.get(3), 1, "number of received files");
     insert_data(System.currentTimeMillis());
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -177,6 +176,5 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends 
AbstractSubscriptionRegre
         "Unsubscribe and resubscribe, progress is not retained. Full 
synchronization.");
     assertEquals(results.get(1), 0, "Subscribe again after unsubscribe," + 
database + ".d_1");
     assertEquals(results.get(2), 0, "Unsubscribe and then subscribe again," + 
database2 + ".d_2");
-    assertEquals(results.get(3), 1, "Number of received files: resubscribe 
after unsubscription");
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
index 4b9bda04ead..a59dcb36242 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
@@ -202,7 +202,7 @@ public class IoTDBTSPatternTsfilePushConsumerIT extends 
AbstractSubscriptionRegr
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceiveCount.get(), 1, "receive files 1");
+          assertGte(onReceiveCount.get(), 1, "receive files over 1");
           assertEquals(rowCounts.get(0).get(), 10, device + ".s_0");
           assertEquals(rowCounts.get(1).get(), 0, device + ".s_1");
           assertEquals(rowCounts.get(2).get(), 0, database + ".d_1.s_0");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index ae719dce859..86451f69ecf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 
 import 
org.apache.iotdb.db.pipe.connector.util.builder.PipeTableModeTsFileBuilder;
-import 
org.apache.iotdb.db.pipe.connector.util.builder.PipeTreeModelTsFileBuilder;
+import 
org.apache.iotdb.db.pipe.connector.util.builder.PipeTreeModelTsFileBuilderV2;
 import org.apache.iotdb.db.pipe.connector.util.builder.PipeTsFileBuilder;
 import 
org.apache.iotdb.db.pipe.connector.util.sorter.PipeTableModelTabletEventSorter;
 import 
org.apache.iotdb.db.pipe.connector.util.sorter.PipeTreeModelTabletEventSorter;
@@ -61,7 +61,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     super(maxDelayInMs, requestMaxBatchSizeInBytes);
 
     final AtomicLong tsFileIdGenerator = new AtomicLong(0);
-    treeModeTsFileBuilder = new PipeTreeModelTsFileBuilder(currentBatchId, 
tsFileIdGenerator);
+    treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, 
tsFileIdGenerator);
     tableModeTsFileBuilder = new PipeTableModeTsFileBuilder(currentBatchId, 
tsFileIdGenerator);
   }
 
@@ -132,7 +132,10 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
       final boolean isAligned) {
     new 
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
 
-    totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+    // TODO: Currently, PipeTreeModelTsFileBuilderV2 still uses 
PipeTreeModelTsFileBuilder as a
+    // fallback builder, so memory table writing and storing temporary tablets 
require double the
+    // memory.
+    totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) 
* 2;
 
     pipeName2WeightMap.compute(
         new Pair<>(pipeName, creationTime),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
index 1cdb0236f57..fcb735c55fc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
@@ -27,6 +27,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.utils.WriteUtils;
+import org.apache.tsfile.write.TsFileWriter;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.slf4j.Logger;
@@ -142,7 +143,7 @@ public class PipeTableModeTsFileBuilder extends 
PipeTsFileBuilder {
     // Try making the tsfile size as large as possible
     while (!linkedHashSet.isEmpty()) {
       if (Objects.isNull(fileWriter)) {
-        createFileWriter();
+        fileWriter = new TsFileWriter(createFile());
       }
 
       try {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
index 5f14c89118b..237e294df69 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilder.java
@@ -25,6 +25,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.utils.BitMap;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.TsFileWriter;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.slf4j.Logger;
@@ -143,7 +144,7 @@ public class PipeTreeModelTsFileBuilder extends 
PipeTsFileBuilder {
     // Try making the tsfile size as large as possible
     while (!device2TabletsLinkedList.isEmpty()) {
       if (Objects.isNull(fileWriter)) {
-        createFileWriter();
+        fileWriter = new TsFileWriter(createFile());
       }
       try {
         tryBestToWriteTabletsIntoOneFile(device2TabletsLinkedList, 
device2Aligned);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
new file mode 100644
index 00000000000..d2d87d90f0f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTreeModelTsFileBuilderV2.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.util.builder;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class PipeTreeModelTsFileBuilderV2 extends PipeTsFileBuilder {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTreeModelTsFileBuilderV2.class);
+
+  private static final PlanNodeId PLACEHOLDER_PLAN_NODE_ID =
+      new PlanNodeId("PipeTreeModelTsFileBuilderV2");
+
+  private final List<Tablet> tabletList = new ArrayList<>();
+  private final List<Boolean> isTabletAlignedList = new ArrayList<>();
+
+  // TODO: remove me later if stable
+  private final PipeTreeModelTsFileBuilder fallbackBuilder;
+
+  public PipeTreeModelTsFileBuilderV2(
+      final AtomicLong currentBatchId, final AtomicLong tsFileIdGenerator) {
+    super(currentBatchId, tsFileIdGenerator);
+    fallbackBuilder = new PipeTreeModelTsFileBuilder(currentBatchId, 
tsFileIdGenerator);
+  }
+
+  @Override
+  public void bufferTableModelTablet(final String dataBase, final Tablet 
tablet) {
+    throw new UnsupportedOperationException(
+        "PipeTreeModelTsFileBuilderV2 does not support table model tablet to 
build TSFile");
+  }
+
+  @Override
+  public void bufferTreeModelTablet(final Tablet tablet, final Boolean 
isAligned) {
+    tabletList.add(tablet);
+    isTabletAlignedList.add(isAligned);
+    fallbackBuilder.bufferTreeModelTablet(tablet, isAligned);
+  }
+
+  @Override
+  public List<Pair<String, File>> convertTabletToTsFileWithDBInfo()
+      throws IOException, WriteProcessException {
+    try {
+      return writeTabletsToTsFiles();
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Exception occurred when PipeTreeModelTsFileBuilderV2 writing 
tablets to tsfile, use fallback tsfile builder: {}",
+          e.getMessage(),
+          e);
+      return fallbackBuilder.convertTabletToTsFileWithDBInfo();
+    }
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return tabletList.isEmpty();
+  }
+
+  @Override
+  public void onSuccess() {
+    super.onSuccess();
+    tabletList.clear();
+    isTabletAlignedList.clear();
+    fallbackBuilder.onSuccess();
+  }
+
+  @Override
+  public synchronized void close() {
+    super.close();
+    tabletList.clear();
+    isTabletAlignedList.clear();
+    fallbackBuilder.close();
+  }
+
+  private List<Pair<String, File>> writeTabletsToTsFiles() throws 
WriteProcessException {
+    final IMemTable memTable = new PrimitiveMemTable(null, null);
+    final List<Pair<String, File>> sealedFiles = new ArrayList<>();
+    try (final RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(createFile())) {
+      writeTabletsIntoOneFile(memTable, writer);
+      sealedFiles.add(new Pair<>(null, writer.getFile()));
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Batch id = {}: Failed to write tablets into tsfile, because {}",
+          currentBatchId.get(),
+          e.getMessage(),
+          e);
+      // TODO: handle ex
+      throw new WriteProcessException(e);
+    } finally {
+      memTable.release();
+    }
+
+    return sealedFiles;
+  }
+
+  private void writeTabletsIntoOneFile(
+      final IMemTable memTable, final RestorableTsFileIOWriter writer) throws 
Exception {
+    for (int i = 0, size = tabletList.size(); i < size; ++i) {
+      final Tablet tablet = tabletList.get(i);
+
+      // convert date value to int
+      // refer to
+      // 
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
+      final Object[] values = tablet.getValues();
+      for (int j = 0; j < tablet.getSchemas().size(); ++j) {
+        final IMeasurementSchema schema = tablet.getSchemas().get(j);
+        if (Objects.nonNull(schema) && Objects.equals(TSDataType.DATE, 
schema.getType())) {
+          final LocalDate[] dates = ((LocalDate[]) values[j]);
+          final int[] dateValues = new int[dates.length];
+          for (int k = 0; k < Math.min(dates.length, tablet.getRowSize()); 
k++) {
+            dateValues[k] = DateUtils.parseDateExpressionToInt(dates[k]);
+          }
+          values[j] = dateValues;
+        }
+      }
+
+      final InsertTabletNode insertTabletNode =
+          new InsertTabletNode(
+              PLACEHOLDER_PLAN_NODE_ID,
+              new PartialPath(tablet.getDeviceId()),
+              isTabletAlignedList.get(i),
+              tablet.getSchemas().stream()
+                  .map(IMeasurementSchema::getMeasurementName)
+                  .toArray(String[]::new),
+              tablet.getSchemas().stream()
+                  .map(IMeasurementSchema::getType)
+                  .toArray(TSDataType[]::new),
+              // TODO: cast
+              tablet.getSchemas().stream()
+                  .map(schema -> (MeasurementSchema) schema)
+                  .toArray(MeasurementSchema[]::new),
+              tablet.getTimestamps(),
+              tablet.getBitMaps(),
+              tablet.getValues(),
+              tablet.getRowSize());
+
+      final int start = 0;
+      final int end = insertTabletNode.getRowCount();
+
+      try {
+        if (insertTabletNode.isAligned()) {
+          memTable.insertAlignedTablet(insertTabletNode, start, end, null);
+        } else {
+          memTable.insertTablet(insertTabletNode, start, end);
+        }
+      } catch (final org.apache.iotdb.db.exception.WriteProcessException e) {
+        throw new WriteProcessException(e);
+      }
+    }
+
+    final MemTableFlushTask memTableFlushTask = new 
MemTableFlushTask(memTable, writer, null, null);
+    memTableFlushTask.syncFlushMemTable();
+
+    writer.endFile();
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java
index d320bff0d17..8b97cafe4c8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java
@@ -145,18 +145,16 @@ public abstract class PipeTsFileBuilder {
     }
   }
 
-  protected void createFileWriter() throws IOException {
-    fileWriter =
-        new TsFileWriter(
-            new File(
-                batchFileBaseDir,
-                TS_FILE_PREFIX
-                    + "_"
-                    + IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
-                    + "_"
-                    + currentBatchId.get()
-                    + "_"
-                    + tsFileIdGenerator.getAndIncrement()
-                    + TsFileConstant.TSFILE_SUFFIX));
+  protected File createFile() throws IOException {
+    return new File(
+        batchFileBaseDir,
+        TS_FILE_PREFIX
+            + "_"
+            + IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+            + "_"
+            + currentBatchId.get()
+            + "_"
+            + tsFileIdGenerator.getAndIncrement()
+            + TsFileConstant.TSFILE_SUFFIX);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 2e201e0c211..8d80547ab0b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.metrics.utils.MetricType;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 public class WritingMetrics implements IMetricSet {
   private static final WritingMetrics INSTANCE = new WritingMetrics();
@@ -813,6 +814,9 @@ public class WritingMetrics implements IMetricSet {
   }
 
   private DataRegionId getDataRegionIdFromStorageGroupStr(String storageGroup) 
{
+    if (Objects.isNull(storageGroup)) {
+      return null;
+    }
     int idx = storageGroup.lastIndexOf('-');
     if (idx == -1) {
       return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 6266a4d3129..44dc2cfbb6d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -2948,6 +2948,9 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   public static Optional<String> getNonSystemDatabaseName(String databaseName) 
{
+    if (Objects.isNull(databaseName)) {
+      return Optional.empty();
+    }
     if (databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
       return Optional.empty();
     }

Reply via email to