This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2cfd029f214 Pipe: use mem table to batch write table data into tsfile
(#15373)
2cfd029f214 is described below
commit 2cfd029f2141d52acc9f10c6a95636834c444eba
Author: VGalaxies <[email protected]>
AuthorDate: Tue Apr 22 10:28:48 2025 +0800
Pipe: use mem table to batch write table data into tsfile (#15373)
---
.../batch/PipeTabletEventTsFileBatch.java | 9 +-
...ilder.java => PipeTableModelTsFileBuilder.java} | 6 +-
.../builder/PipeTableModelTsFileBuilderV2.java | 259 +++++++++++++++++++++
3 files changed, 268 insertions(+), 6 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 86451f69ecf..8eab2c6453c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
-import
org.apache.iotdb.db.pipe.connector.util.builder.PipeTableModeTsFileBuilder;
+import
org.apache.iotdb.db.pipe.connector.util.builder.PipeTableModelTsFileBuilderV2;
import
org.apache.iotdb.db.pipe.connector.util.builder.PipeTreeModelTsFileBuilderV2;
import org.apache.iotdb.db.pipe.connector.util.builder.PipeTsFileBuilder;
import
org.apache.iotdb.db.pipe.connector.util.sorter.PipeTableModelTabletEventSorter;
@@ -62,7 +62,7 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
final AtomicLong tsFileIdGenerator = new AtomicLong(0);
treeModeTsFileBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId,
tsFileIdGenerator);
- tableModeTsFileBuilder = new PipeTableModeTsFileBuilder(currentBatchId,
tsFileIdGenerator);
+ tableModeTsFileBuilder = new PipeTableModelTsFileBuilderV2(currentBatchId,
tsFileIdGenerator);
}
@Override
@@ -148,7 +148,10 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
final String pipeName, final long creationTime, final Tablet tablet,
final String dataBase) {
new
PipeTableModelTabletEventSorter(tablet).sortAndDeduplicateByDevIdTimestamp();
- totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+ // TODO: Currently, PipeTableModelTsFileBuilderV2 still uses
PipeTableModelTsFileBuilder as a
+ // fallback builder, so memory table writing and storing temporary tablets
require double the
+ // memory.
+ totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet)
* 2;
pipeName2WeightMap.compute(
new Pair<>(pipeName, creationTime),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
similarity index 98%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
index 20153bb1d67..4232ebab676 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModeTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilder.java
@@ -54,13 +54,13 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-public class PipeTableModeTsFileBuilder extends PipeTsFileBuilder {
+public class PipeTableModelTsFileBuilder extends PipeTsFileBuilder {
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTableModeTsFileBuilder.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTableModelTsFileBuilder.class);
private final Map<String, List<Tablet>> dataBase2TabletList = new
HashMap<>();
- public PipeTableModeTsFileBuilder(AtomicLong currentBatchId, AtomicLong
tsFileIdGenerator) {
+ public PipeTableModelTsFileBuilder(AtomicLong currentBatchId, AtomicLong
tsFileIdGenerator) {
super(currentBatchId, tsFileIdGenerator);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
new file mode 100644
index 00000000000..bba600df7ae
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTableModelTsFileBuilderV2.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.connector.util.builder;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.TableSchema;
+import org.apache.tsfile.utils.DateUtils;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.record.Tablet.ColumnCategory;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+public class PipeTableModelTsFileBuilderV2 extends PipeTsFileBuilder {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PipeTableModelTsFileBuilderV2.class);
+
+ private static final PlanNodeId PLACEHOLDER_PLAN_NODE_ID =
+ new PlanNodeId("PipeTableModelTsFileBuilderV2");
+
+ private final Map<String, List<Tablet>> dataBase2TabletList = new
HashMap<>();
+
+ // TODO: remove me later if stable
+ private final PipeTableModelTsFileBuilder fallbackBuilder;
+
+ public PipeTableModelTsFileBuilderV2(
+ final AtomicLong currentBatchId, final AtomicLong tsFileIdGenerator) {
+ super(currentBatchId, tsFileIdGenerator);
+ fallbackBuilder = new PipeTableModelTsFileBuilder(currentBatchId,
tsFileIdGenerator);
+ }
+
+ @Override
+ public void bufferTableModelTablet(String dataBase, Tablet tablet) {
+ dataBase2TabletList.computeIfAbsent(dataBase, db -> new
ArrayList<>()).add(tablet);
+ }
+
+ @Override
+ public void bufferTreeModelTablet(Tablet tablet, Boolean isAligned) {
+ throw new UnsupportedOperationException(
+ "PipeTableModeTsFileBuilderV2 does not support tree model tablet to
build TSFile");
+ }
+
+ @Override
+ public List<Pair<String, File>> convertTabletToTsFileWithDBInfo() throws
IOException {
+ if (dataBase2TabletList.isEmpty()) {
+ return new ArrayList<>(0);
+ }
+ try {
+ final List<Pair<String, File>> pairList = new ArrayList<>();
+ for (final String dataBase : dataBase2TabletList.keySet()) {
+ pairList.addAll(writeTabletsToTsFiles(dataBase));
+ }
+ return pairList;
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Exception occurred when PipeTableModelTsFileBuilderV2 writing
tablets to tsfile, use fallback tsfile builder: {}",
+ e.getMessage(),
+ e);
+ return fallbackBuilder.convertTabletToTsFileWithDBInfo();
+ }
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return dataBase2TabletList.isEmpty();
+ }
+
+ @Override
+ public synchronized void onSuccess() {
+ super.onSuccess();
+ dataBase2TabletList.clear();
+ fallbackBuilder.onSuccess();
+ }
+
+ @Override
+ public synchronized void close() {
+ super.close();
+ dataBase2TabletList.clear();
+ fallbackBuilder.close();
+ }
+
+ private List<Pair<String, File>> writeTabletsToTsFiles(final String dataBase)
+ throws WriteProcessException {
+ final IMemTable memTable = new PrimitiveMemTable(null, null);
+ final List<Pair<String, File>> sealedFiles = new ArrayList<>();
+ try (final RestorableTsFileIOWriter writer = new
RestorableTsFileIOWriter(createFile())) {
+ writeTabletsIntoOneFile(dataBase, memTable, writer);
+ sealedFiles.add(new Pair<>(dataBase, writer.getFile()));
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Batch id = {}: Failed to write tablets into tsfile, because {}",
+ currentBatchId.get(),
+ e.getMessage(),
+ e);
+ // TODO: handle ex
+ throw new WriteProcessException(e);
+ } finally {
+ memTable.release();
+ }
+
+ return sealedFiles;
+ }
+
+ private void writeTabletsIntoOneFile(
+ final String dataBase, final IMemTable memTable, final
RestorableTsFileIOWriter writer)
+ throws Exception {
+ final List<Tablet> tabletList =
Objects.requireNonNull(dataBase2TabletList.get(dataBase));
+
+ final Map<String, List<Tablet>> tableName2TabletList = new HashMap<>();
+ for (final Tablet tablet : tabletList) {
+ tableName2TabletList
+ .computeIfAbsent(tablet.getTableName(), k -> new ArrayList<>())
+ .add(tablet);
+ }
+
+ for (Map.Entry<String, List<Tablet>> entry :
tableName2TabletList.entrySet()) {
+ final String tableName = entry.getKey();
+ final List<Tablet> tablets = entry.getValue();
+
+ List<IMeasurementSchema> aggregatedSchemas =
+ tablets.stream()
+ .flatMap(tablet -> tablet.getSchemas().stream())
+ .collect(Collectors.toList());
+ List<ColumnCategory> aggregatedColumnCategories =
+ tablets.stream()
+ .flatMap(tablet -> tablet.getColumnTypes().stream())
+ .collect(Collectors.toList());
+
+ final Set<IMeasurementSchema> seen = new HashSet<>();
+ final List<Integer> distinctIndices =
+ IntStream.range(0, aggregatedSchemas.size())
+ .filter(
+ i -> seen.add(aggregatedSchemas.get(i))) // Only keep the
first occurrence index
+ .boxed()
+ .collect(Collectors.toList());
+
+ writer
+ .getSchema()
+ .getTableSchemaMap()
+ .put(
+ tableName,
+ new TableSchema(
+ tableName,
+
distinctIndices.stream().map(aggregatedSchemas::get).collect(Collectors.toList()),
+ distinctIndices.stream()
+ .map(aggregatedColumnCategories::get)
+ .collect(Collectors.toList())));
+ }
+
+ for (int i = 0, size = tabletList.size(); i < size; ++i) {
+ final Tablet tablet = tabletList.get(i);
+
+ // convert date value to int
+ // refer to
+ //
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
+ final Object[] values = tablet.getValues();
+ for (int j = 0; j < tablet.getSchemas().size(); ++j) {
+ final IMeasurementSchema schema = tablet.getSchemas().get(j);
+ if (Objects.nonNull(schema)
+ && Objects.equals(TSDataType.DATE, schema.getType())
+ && values[j] instanceof LocalDate[]) {
+ final LocalDate[] dates = ((LocalDate[]) values[j]);
+ final int[] dateValues = new int[dates.length];
+ for (int k = 0; k < Math.min(dates.length, tablet.getRowSize());
k++) {
+ if (Objects.nonNull(dates[k])) {
+ dateValues[k] = DateUtils.parseDateExpressionToInt(dates[k]);
+ }
+ }
+ values[j] = dateValues;
+ }
+ }
+
+ final RelationalInsertTabletNode insertTabletNode =
+ new RelationalInsertTabletNode(
+ PLACEHOLDER_PLAN_NODE_ID,
+ new PartialPath(tablet.getTableName()),
+ // the data of the table model is aligned
+ true,
+ tablet.getSchemas().stream()
+ .map(IMeasurementSchema::getMeasurementName)
+ .toArray(String[]::new),
+ tablet.getSchemas().stream()
+ .map(IMeasurementSchema::getType)
+ .toArray(TSDataType[]::new),
+ // TODO: cast
+ tablet.getSchemas().stream()
+ .map(schema -> (MeasurementSchema) schema)
+ .toArray(MeasurementSchema[]::new),
+ tablet.getTimestamps(),
+ tablet.getBitMaps(),
+ tablet.getValues(),
+ tablet.getRowSize(),
+ tablet.getColumnTypes().stream()
+ .map(TsTableColumnCategory::fromTsFileColumnCategory)
+ .toArray(TsTableColumnCategory[]::new));
+
+ final int start = 0;
+ final int end = insertTabletNode.getRowCount();
+
+ try {
+ if (insertTabletNode.isAligned()) {
+ memTable.insertAlignedTablet(insertTabletNode, start, end, null);
+ } else {
+ memTable.insertTablet(insertTabletNode, start, end);
+ }
+ } catch (final org.apache.iotdb.db.exception.WriteProcessException e) {
+ throw new WriteProcessException(e);
+ }
+ }
+
+ final MemTableFlushTask memTableFlushTask = new
MemTableFlushTask(memTable, writer, null, null);
+ memTableFlushTask.syncFlushMemTable();
+
+ writer.endFile();
+ }
+}