This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 3e0a17f6805 Pipe: use mem table to batch write data into tsfile 
(#15276) (#15353)
3e0a17f6805 is described below

commit 3e0a17f6805178676096ebe2f0f0358964f43857
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 21 18:46:23 2025 +0800

    Pipe: use mem table to batch write data into tsfile (#15276) (#15353)
    
    * Pipe: use mem table to batch write tree data into tsfile (#15276)
    
    * fixup
---
 .../pattern/IoTDBTSPatternPullConsumeTsfileIT.java |   2 -
 .../IoTDBTSPatternTsfilePushConsumerIT.java        |   2 +-
 .../batch/PipeTabletEventTsFileBatch.java          | 143 +++++++++++++++++++--
 .../iotdb/db/service/metrics/WritingMetrics.java   |   4 +
 .../db/storageengine/dataregion/DataRegion.java    |   4 +
 5 files changed, 138 insertions(+), 17 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
index 33d3d1aeffe..53729b3d26c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
@@ -157,7 +157,6 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends 
AbstractSubscriptionRegre
     assertEquals(results.get(0), 10);
     assertEquals(results.get(1), 0);
     assertEquals(results.get(2), 0);
-    assertEquals(results.get(3), 1, "number of received files");
     insert_data(System.currentTimeMillis());
     // Unsubscribe
     consumer.unsubscribe(topicName);
@@ -176,6 +175,5 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends 
AbstractSubscriptionRegre
         "Unsubscribe and resubscribe, progress is not retained. Full 
synchronization.");
     assertEquals(results.get(1), 0, "Subscribe again after unsubscribe," + 
database + ".d_1");
     assertEquals(results.get(2), 0, "Unsubscribe and then subscribe again," + 
database2 + ".d_2");
-    assertEquals(results.get(3), 1, "Number of received files: resubscribe 
after unsubscription");
   }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
index f12000d89b6..705310a1baa 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
@@ -201,7 +201,7 @@ public class IoTDBTSPatternTsfilePushConsumerIT extends 
AbstractSubscriptionRegr
 
     AWAIT.untilAsserted(
         () -> {
-          assertEquals(onReceiveCount.get(), 1, "receive files 1");
+          assertGte(onReceiveCount.get(), 1, "receive files over 1");
           assertEquals(rowCounts.get(0).get(), 10, device + ".s_0");
           assertEquals(rowCounts.get(1).get(), 0, device + ".s_1");
           assertEquals(rowCounts.get(2).get(), 0, database + ".d_1.s_0");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 44bbaa87735..2344d3f5aac 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -19,12 +19,18 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
 import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
 import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
 import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -33,18 +39,22 @@ import org.apache.iotdb.session.util.RetryUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.DateUtils;
 import org.apache.tsfile.utils.Pair;
 import org.apache.tsfile.write.TsFileWriter;
 import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -180,7 +190,9 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
       final boolean isAligned) {
     new 
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
 
-    totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+    // TODO: Currently, PipeTsFileBuilderV2 still uses a fallback builder, so 
memory table writing
+    // and storing temporary tablets require double the memory.
+    totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) 
* 2;
 
     pipeName2WeightMap.compute(
         new Pair<>(pipeName, creationTime),
@@ -200,7 +212,18 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
   }
 
   public synchronized List<File> sealTsFiles() throws IOException, 
WriteProcessException {
-    return isClosed ? Collections.emptyList() : writeTabletsToTsFiles();
+    if (isClosed) {
+      return Collections.emptyList();
+    }
+    try {
+      return new PipeTsFileBuilderV2().writeTabletsToTsFiles();
+    } catch (org.apache.iotdb.db.exception.WriteProcessException e) {
+      LOGGER.warn(
+          "Exception occurred when PipeTsFileBuilderV2 writing tablets to 
tsfile, use fallback tsfile builder: {}",
+          e.getMessage(),
+          e);
+      return writeTabletsToTsFiles();
+    }
   }
 
   private List<File> writeTabletsToTsFiles() throws IOException, 
WriteProcessException {
@@ -248,18 +271,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
     // Try making the tsfile size as large as possible
     while (!device2TabletsLinkedList.isEmpty()) {
       if (Objects.isNull(fileWriter)) {
-        fileWriter =
-            new TsFileWriter(
-                new File(
-                    batchFileBaseDir,
-                    TS_FILE_PREFIX
-                        + "_"
-                        + 
IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
-                        + "_"
-                        + currentBatchId.get()
-                        + "_"
-                        + tsFileIdGenerator.getAndIncrement()
-                        + TsFileConstant.TSFILE_SUFFIX));
+        fileWriter = new TsFileWriter(createFile());
       }
 
       try {
@@ -497,4 +509,107 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
       fileWriter = null;
     }
   }
+
+  protected File createFile() throws IOException {
+    return new File(
+        batchFileBaseDir,
+        TS_FILE_PREFIX
+            + "_"
+            + IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+            + "_"
+            + currentBatchId.get()
+            + "_"
+            + tsFileIdGenerator.getAndIncrement()
+            + TsFileConstant.TSFILE_SUFFIX);
+  }
+
+  /////////////////////// PipeTsFileBuilderV2 //////////////////////////
+
+  private static final PlanNodeId PLACEHOLDER_PLAN_NODE_ID =
+      new PlanNodeId("PipeTreeModelTsFileBuilderV2");
+
+  private class PipeTsFileBuilderV2 {
+
+    private List<File> writeTabletsToTsFiles()
+        throws org.apache.iotdb.db.exception.WriteProcessException {
+      final IMemTable memTable = new PrimitiveMemTable(null, null);
+      final List<File> sealedFiles = new ArrayList<>();
+      try (final RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(createFile())) {
+        writeTabletsIntoOneFile(memTable, writer);
+        sealedFiles.add(writer.getFile());
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Batch id = {}: Failed to write tablets into tsfile, because {}",
+            currentBatchId.get(),
+            e.getMessage(),
+            e);
+        // TODO: handle ex
+        throw new org.apache.iotdb.db.exception.WriteProcessException(e);
+      } finally {
+        memTable.release();
+      }
+
+      return sealedFiles;
+    }
+
+    private void writeTabletsIntoOneFile(
+        final IMemTable memTable, final RestorableTsFileIOWriter writer) 
throws Exception {
+      for (int i = 0, size = tabletList.size(); i < size; ++i) {
+        final Tablet tablet = tabletList.get(i);
+
+        // convert date value to int
+        // refer to
+        // 
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
+        final Object[] values = tablet.values;
+        for (int j = 0; j < tablet.getSchemas().size(); ++j) {
+          final MeasurementSchema schema = tablet.getSchemas().get(j);
+          if (Objects.nonNull(schema) && Objects.equals(TSDataType.DATE, 
schema.getType())) {
+            final LocalDate[] dates = ((LocalDate[]) values[j]);
+            final int[] dateValues = new int[dates.length];
+            for (int k = 0; k < Math.min(dates.length, tablet.rowSize); k++) {
+              dateValues[k] = DateUtils.parseDateExpressionToInt(dates[k]);
+            }
+            values[j] = dateValues;
+          }
+        }
+
+        final InsertTabletNode insertTabletNode =
+            new InsertTabletNode(
+                PLACEHOLDER_PLAN_NODE_ID,
+                new PartialPath(tablet.deviceId),
+                isTabletAlignedList.get(i),
+                tablet.getSchemas().stream()
+                    .map(MeasurementSchema::getMeasurementId)
+                    .toArray(String[]::new),
+                tablet.getSchemas().stream()
+                    .map(MeasurementSchema::getType)
+                    .toArray(TSDataType[]::new),
+                // TODO: cast
+                tablet.getSchemas().toArray(new MeasurementSchema[0]),
+                tablet.timestamps,
+                tablet.bitMaps,
+                tablet.values,
+                tablet.rowSize);
+
+        final int start = 0;
+        final int end = insertTabletNode.getRowCount();
+
+        try {
+          if (insertTabletNode.isAligned()) {
+            memTable.insertAlignedTablet(insertTabletNode, start, end);
+          } else {
+            memTable.insertTablet(insertTabletNode, start, end);
+          }
+        } catch (final org.apache.iotdb.db.exception.WriteProcessException e) {
+          throw new org.apache.iotdb.db.exception.WriteProcessException(e);
+        }
+      }
+
+      final MemTableFlushTask memTableFlushTask =
+          new MemTableFlushTask(memTable, writer, null, null);
+      memTableFlushTask.syncFlushMemTable();
+
+      writer.endFile();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 770e6c485ea..d3cda241a9f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.metrics.utils.MetricType;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
 
 public class WritingMetrics implements IMetricSet {
   private static final WritingMetrics INSTANCE = new WritingMetrics();
@@ -813,6 +814,9 @@ public class WritingMetrics implements IMetricSet {
   }
 
   private DataRegionId getDataRegionIdFromStorageGroupStr(String storageGroup) 
{
+    if (Objects.isNull(storageGroup)) {
+      return null;
+    }
     int idx = storageGroup.lastIndexOf('-');
     if (idx == -1) {
       return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 961968d601f..a734112c56e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -153,6 +153,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
@@ -2863,6 +2864,9 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   public static Optional<String> getNonSystemDatabaseName(String databaseName) 
{
+    if (Objects.isNull(databaseName)) {
+      return Optional.empty();
+    }
     if (databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
       return Optional.empty();
     }

Reply via email to