This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 2cfd029f214 Pipe: use mem table to batch write table data into tsfile 
(#15373)
2cfd029f214 is described below

commit 2cfd029f2141d52acc9f10c6a95636834c444eba
Author: VGalaxies <[email protected]>
AuthorDate: Tue Apr 22 10:28:48 2025 +0800

    Pipe: use mem table to batch write table data into tsfile (#15373)
---
 .../batch/PipeTabletEventTsFileBatch.java          |   9 +-
 ...ilder.java => PipeTableModelTsFileBuilder.java} |   6 +-
 .../builder/PipeTableModelTsFileBuilderV2.java     | 259 +++++++++++++++++++++
 3 files changed, 268 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 86451f69ecf..8eab2c6453c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
 
-import 
org.apache.iotdb.db.pipe.connector.util.builder.PipeTableModeTsFileBuilder;
+import 
org.apache.iotdb.db.pipe.connector.util.builder.PipeTableModelTsFileBuilderV2;
 import 
org.apache.iotdb.db.pipe.connector.util.builder.PipeTreeModelTsFileBuilderV2;
 import org.apache.iotdb.db.pipe.connector.util.builder.PipeTsFileBuilder;
 import 
org.apache.iotdb.db.pipe.connector.util.sorter.PipeTableModelTabletEventSorter;
@@ -62,7 +62,7 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
 
     final AtomicLong tsFileIdGenerator = new AtomicLong(0);
     treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, 
tsFileIdGenerator);
-    tableModeTsFileBuilder = new PipeTableModeTsFileBuilder(currentBatchId, 
tsFileIdGenerator);
+    tableModeTsFileBuilder = new PipeTableModelTsFileBuilderV2(currentBatchId, 
tsFileIdGenerator);
   }
 
   @Override
@@ -148,7 +148,10 @@ public class PipeTabletEventTsFileBatch extends 
PipeTabletEventBatch {
       final String pipeName, final long creationTime, final Tablet tablet, 
final String dataBase) {
     new 
PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp();
 
-    totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+    // TODO: Currently, PipeTableModelTsFileBuilderV2 still uses 
PipeTableModelTsFileBuilder as a
+    // fallback builder, so memory table writing and storing temporary tablets 
require double the
+    // memory.
+    totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) 
* 2;
 
     pipeName2WeightMap.compute(
         new Pair<>(pipeName, creationTime),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
similarity index 98%
rename from 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
rename to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
index 20153bb1d67..4232ebab676 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
@@ -54,13 +54,13 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder {
+public class PipeTableModelTsFileBuilder extends PipeTsFileBuilder {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTableModeTsFileBuilder.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTableModelTsFileBuilder.class);
 
   private final Map<String, List<Tablet>> dataBase2TabletList = new 
HashMap<>();
 
-  public PipeTableModeTsFileBuilder(AtomicLong currentBatchId, AtomicLong 
tsFileIdGenerator) {
+  public PipeTableModelTsFileBuilder(AtomicLong currentBatchId, AtomicLong 
tsFileIdGenerator) {
     super(currentBatchId, tsFileIdGenerator);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
new file mode 100644
index 00000000000..bba600df7ae
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.util.builder;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnCategory;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class PipeTableModelTsFileBuilderV2 extends PipeTsFileBuilder {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PipeTableModelTsFileBuilderV2.class);
+
+  private static final PlanNodeId PLACEHOLDER_PLAN_NODE_ID =
+      new PlanNodeId("PipeTableModelTsFileBuilderV2");
+
+  private final Map<String, List<Tablet>> dataBase2TabletList = new 
HashMap<>();
+
+  // TODO: remove me later if stable
+  private final PipeTableModelTsFileBuilder fallbackBuilder;
+
+  public PipeTableModelTsFileBuilderV2(
+      final AtomicLong currentBatchId, final AtomicLong tsFileIdGenerator) {
+    super(currentBatchId, tsFileIdGenerator);
+    fallbackBuilder = new PipeTableModelTsFileBuilder(currentBatchId, 
tsFileIdGenerator);
+  }
+
+  @Override
+  public void bufferTableModelTablet(String dataBase, Tablet tablet) {
+    dataBase2TabletList.computeIfAbsent(dataBase, db -> new 
ArrayList<>()).add(tablet);
+  }
+
+  @Override
+  public void bufferTreeModelTablet(Tablet tablet, Boolean isAligned) {
+    throw new UnsupportedOperationException(
+        "PipeTableModeTsFileBuilderV2 does not support tree model tablet to 
build TSFile");
+  }
+
+  @Override
+  public List<Pair<String, File>> convertTabletToTsFileWithDBInfo() throws 
IOException {
+    if (dataBase2TabletList.isEmpty()) {
+      return new ArrayList<>(0);
+    }
+    try {
+      final List<Pair<String, File>> pairList = new ArrayList<>();
+      for (final String dataBase : dataBase2TabletList.keySet()) {
+        pairList.addAll(writeTabletsToTsFiles(dataBase));
+      }
+      return pairList;
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Exception occurred when PipeTableModelTsFileBuilderV2 writing 
tablets to tsfile, use fallback tsfile builder: {}",
+          e.getMessage(),
+          e);
+      return fallbackBuilder.convertTabletToTsFileWithDBInfo();
+    }
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return dataBase2TabletList.isEmpty();
+  }
+
+  @Override
+  public synchronized void onSuccess() {
+    super.onSuccess();
+    dataBase2TabletList.clear();
+    fallbackBuilder.onSuccess();
+  }
+
+  @Override
+  public synchronized void close() {
+    super.close();
+    dataBase2TabletList.clear();
+    fallbackBuilder.close();
+  }
+
+  private List<Pair<String, File>> writeTabletsToTsFiles(final String dataBase)
+      throws WriteProcessException {
+    final IMemTable memTable = new PrimitiveMemTable(null, null);
+    final List<Pair<String, File>> sealedFiles = new ArrayList<>();
+    try (final RestorableTsFileIOWriter writer = new 
RestorableTsFileIOWriter(createFile())) {
+      writeTabletsIntoOneFile(dataBase, memTable, writer);
+      sealedFiles.add(new Pair<>(dataBase, writer.getFile()));
+    } catch (final Exception e) {
+      LOGGER.warn(
+          "Batch id = {}: Failed to write tablets into tsfile, because {}",
+          currentBatchId.get(),
+          e.getMessage(),
+          e);
+      // TODO: handle ex
+      throw new WriteProcessException(e);
+    } finally {
+      memTable.release();
+    }
+
+    return sealedFiles;
+  }
+
+  private void writeTabletsIntoOneFile(
+      final String dataBase, final IMemTable memTable, final 
RestorableTsFileIOWriter writer)
+      throws Exception {
+    final List<Tablet> tabletList = 
Objects.requireNonNull(dataBase2TabletList.get(dataBase));
+
+    final Map<String, List<Tablet>> tableName2TabletList = new HashMap<>();
+    for (final Tablet tablet : tabletList) {
+      tableName2TabletList
+          .computeIfAbsent(tablet.getTableName(), k -> new ArrayList<>())
+          .add(tablet);
+    }
+
+    for (Map.Entry<String, List<Tablet>> entry : 
tableName2TabletList.entrySet()) {
+      final String tableName = entry.getKey();
+      final List<Tablet> tablets = entry.getValue();
+
+      List<IMeasurementSchema> aggregatedSchemas =
+          tablets.stream()
+              .flatMap(tablet -> tablet.getSchemas().stream())
+              .collect(Collectors.toList());
+      List<ColumnCategory> aggregatedColumnCategories =
+          tablets.stream()
+              .flatMap(tablet -> tablet.getColumnTypes().stream())
+              .collect(Collectors.toList());
+
+      final Set<IMeasurementSchema> seen = new HashSet<>();
+      final List<Integer> distinctIndices =
+          IntStream.range(0, aggregatedSchemas.size())
+              .filter(
+                  i -> seen.add(aggregatedSchemas.get(i))) // Only keep the 
first occurrence index
+              .boxed()
+              .collect(Collectors.toList());
+
+      writer
+          .getSchema()
+          .getTableSchemaMap()
+          .put(
+              tableName,
+              new TableSchema(
+                  tableName,
+                  
distinctIndices.stream().map(aggregatedSchemas::get).collect(Collectors.toList()),
+                  distinctIndices.stream()
+                      .map(aggregatedColumnCategories::get)
+                      .collect(Collectors.toList())));
+    }
+
+    for (int i = 0, size = tabletList.size(); i < size; ++i) {
+      final Tablet tablet = tabletList.get(i);
+
+      // convert date value to int
+      // refer to
+      // 
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
+      final Object[] values = tablet.getValues();
+      for (int j = 0; j < tablet.getSchemas().size(); ++j) {
+        final IMeasurementSchema schema = tablet.getSchemas().get(j);
+        if (Objects.nonNull(schema)
+            && Objects.equals(TSDataType.DATE, schema.getType())
+            && values[j] instanceof LocalDate[]) {
+          final LocalDate[] dates = ((LocalDate[]) values[j]);
+          final int[] dateValues = new int[dates.length];
+          for (int k = 0; k < Math.min(dates.length, tablet.getRowSize()); 
k++) {
+            if (Objects.nonNull(dates[k])) {
+              dateValues[k] = DateUtils.parseDateExpressionToInt(dates[k]);
+            }
+          }
+          values[j] = dateValues;
+        }
+      }
+
+      final RelationalInsertTabletNode insertTabletNode =
+          new RelationalInsertTabletNode(
+              PLACEHOLDER_PLAN_NODE_ID,
+              new PartialPath(tablet.getTableName()),
+              // the data of the table model is aligned
+              true,
+              tablet.getSchemas().stream()
+                  .map(IMeasurementSchema::getMeasurementName)
+                  .toArray(String[]::new),
+              tablet.getSchemas().stream()
+                  .map(IMeasurementSchema::getType)
+                  .toArray(TSDataType[]::new),
+              // TODO: cast
+              tablet.getSchemas().stream()
+                  .map(schema -> (MeasurementSchema) schema)
+                  .toArray(MeasurementSchema[]::new),
+              tablet.getTimestamps(),
+              tablet.getBitMaps(),
+              tablet.getValues(),
+              tablet.getRowSize(),
+              tablet.getColumnTypes().stream()
+                  .map(TsTableColumnCategory::fromTsFileColumnCategory)
+                  .toArray(TsTableColumnCategory[]::new));
+
+      final int start = 0;
+      final int end = insertTabletNode.getRowCount();
+
+      try {
+        if (insertTabletNode.isAligned()) {
+          memTable.insertAlignedTablet(insertTabletNode, start, end, null);
+        } else {
+          memTable.insertTablet(insertTabletNode, start, end);
+        }
+      } catch (final org.apache.iotdb.db.exception.WriteProcessException e) {
+        throw new WriteProcessException(e);
+      }
+    }
+
+    final MemTableFlushTask memTableFlushTask = new 
MemTableFlushTask(memTable, writer, null, null);
+    memTableFlushTask.syncFlushMemTable();
+
+    writer.endFile();
+  }
+}

Reply via email to