This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 3e0a17f6805 Pipe: use mem table to batch write data into tsfile
(#15276) (#15353)
3e0a17f6805 is described below
commit 3e0a17f6805178676096ebe2f0f0358964f43857
Author: VGalaxies <[email protected]>
AuthorDate: Mon Apr 21 18:46:23 2025 +0800
Pipe: use mem table to batch write data into tsfile (#15276) (#15353)
* Pipe: use mem table to batch write tree data into tsfile (#15276)
* fixup
---
.../pattern/IoTDBTSPatternPullConsumeTsfileIT.java | 2 -
.../IoTDBTSPatternTsfilePushConsumerIT.java | 2 +-
.../batch/PipeTabletEventTsFileBatch.java | 143 +++++++++++++++++++--
.../iotdb/db/service/metrics/WritingMetrics.java | 4 +
.../db/storageengine/dataregion/DataRegion.java | 4 +
5 files changed, 138 insertions(+), 17 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
index 33d3d1aeffe..53729b3d26c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pullconsumer/pattern/IoTDBTSPatternPullConsumeTsfileIT.java
@@ -157,7 +157,6 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends
AbstractSubscriptionRegre
assertEquals(results.get(0), 10);
assertEquals(results.get(1), 0);
assertEquals(results.get(2), 0);
- assertEquals(results.get(3), 1, "number of received files");
insert_data(System.currentTimeMillis());
// Unsubscribe
consumer.unsubscribe(topicName);
@@ -176,6 +175,5 @@ public class IoTDBTSPatternPullConsumeTsfileIT extends
AbstractSubscriptionRegre
"Unsubscribe and resubscribe, progress is not retained. Full
synchronization.");
assertEquals(results.get(1), 0, "Subscribe again after unsubscribe," +
database + ".d_1");
assertEquals(results.get(2), 0, "Unsubscribe and then subscribe again," +
database2 + ".d_2");
- assertEquals(results.get(3), 1, "Number of received files: resubscribe
after unsubscription");
}
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
index f12000d89b6..705310a1baa 100644
---
a/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/subscription/it/triple/regression/pushconsumer/pattern/IoTDBTSPatternTsfilePushConsumerIT.java
@@ -201,7 +201,7 @@ public class IoTDBTSPatternTsfilePushConsumerIT extends
AbstractSubscriptionRegr
AWAIT.untilAsserted(
() -> {
- assertEquals(onReceiveCount.get(), 1, "receive files 1");
+ assertGte(onReceiveCount.get(), 1, "receive files over 1");
assertEquals(rowCounts.get(0).get(), 10, device + ".s_0");
assertEquals(rowCounts.get(1).get(), 0, device + ".s_1");
assertEquals(rowCounts.get(2).get(), 0, database + ".d_1.s_0");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 44bbaa87735..2344d3f5aac 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -19,12 +19,18 @@
package org.apache.iotdb.db.pipe.connector.payload.evolvable.batch;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.pipe.connector.util.PipeTabletEventSorter;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
+import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -33,18 +39,22 @@ import org.apache.iotdb.session.util.RetryUtils;
import org.apache.commons.io.FileUtils;
import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.utils.DateUtils;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.TsFileWriter;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.writer.RestorableTsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -180,7 +190,9 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
final boolean isAligned) {
new
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
- totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet);
+ // TODO: Currently, PipeTsFileBuilderV2 still uses a fallback builder, so
memory table writing
+ // and storing temporary tablets require double the memory.
+ totalBufferSize += PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet)
* 2;
pipeName2WeightMap.compute(
new Pair<>(pipeName, creationTime),
@@ -200,7 +212,18 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
}
public synchronized List<File> sealTsFiles() throws IOException,
WriteProcessException {
- return isClosed ? Collections.emptyList() : writeTabletsToTsFiles();
+ if (isClosed) {
+ return Collections.emptyList();
+ }
+ try {
+ return new PipeTsFileBuilderV2().writeTabletsToTsFiles();
+ } catch (org.apache.iotdb.db.exception.WriteProcessException e) {
+ LOGGER.warn(
+ "Exception occurred when PipeTsFileBuilderV2 writing tablets to
tsfile, use fallback tsfile builder: {}",
+ e.getMessage(),
+ e);
+ return writeTabletsToTsFiles();
+ }
}
private List<File> writeTabletsToTsFiles() throws IOException,
WriteProcessException {
@@ -248,18 +271,7 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
// Try making the tsfile size as large as possible
while (!device2TabletsLinkedList.isEmpty()) {
if (Objects.isNull(fileWriter)) {
- fileWriter =
- new TsFileWriter(
- new File(
- batchFileBaseDir,
- TS_FILE_PREFIX
- + "_"
- +
IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
- + "_"
- + currentBatchId.get()
- + "_"
- + tsFileIdGenerator.getAndIncrement()
- + TsFileConstant.TSFILE_SUFFIX));
+ fileWriter = new TsFileWriter(createFile());
}
try {
@@ -497,4 +509,107 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
fileWriter = null;
}
}
+
+ protected File createFile() throws IOException {
+ return new File(
+ batchFileBaseDir,
+ TS_FILE_PREFIX
+ + "_"
+ + IoTDBDescriptor.getInstance().getConfig().getDataNodeId()
+ + "_"
+ + currentBatchId.get()
+ + "_"
+ + tsFileIdGenerator.getAndIncrement()
+ + TsFileConstant.TSFILE_SUFFIX);
+ }
+
+ /////////////////////// PipeTsFileBuilderV2 //////////////////////////
+
+ private static final PlanNodeId PLACEHOLDER_PLAN_NODE_ID =
+ new PlanNodeId("PipeTreeModelTsFileBuilderV2");
+
+ private class PipeTsFileBuilderV2 {
+
+ private List<File> writeTabletsToTsFiles()
+ throws org.apache.iotdb.db.exception.WriteProcessException {
+ final IMemTable memTable = new PrimitiveMemTable(null, null);
+ final List<File> sealedFiles = new ArrayList<>();
+ try (final RestorableTsFileIOWriter writer = new
RestorableTsFileIOWriter(createFile())) {
+ writeTabletsIntoOneFile(memTable, writer);
+ sealedFiles.add(writer.getFile());
+ } catch (final Exception e) {
+ LOGGER.warn(
+ "Batch id = {}: Failed to write tablets into tsfile, because {}",
+ currentBatchId.get(),
+ e.getMessage(),
+ e);
+ // TODO: handle ex
+ throw new org.apache.iotdb.db.exception.WriteProcessException(e);
+ } finally {
+ memTable.release();
+ }
+
+ return sealedFiles;
+ }
+
+ private void writeTabletsIntoOneFile(
+ final IMemTable memTable, final RestorableTsFileIOWriter writer)
throws Exception {
+ for (int i = 0, size = tabletList.size(); i < size; ++i) {
+ final Tablet tablet = tabletList.get(i);
+
+ // convert date value to int
+ // refer to
+ //
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
+ final Object[] values = tablet.values;
+ for (int j = 0; j < tablet.getSchemas().size(); ++j) {
+ final MeasurementSchema schema = tablet.getSchemas().get(j);
+ if (Objects.nonNull(schema) && Objects.equals(TSDataType.DATE,
schema.getType())) {
+ final LocalDate[] dates = ((LocalDate[]) values[j]);
+ final int[] dateValues = new int[dates.length];
+ for (int k = 0; k < Math.min(dates.length, tablet.rowSize); k++) {
+ dateValues[k] = DateUtils.parseDateExpressionToInt(dates[k]);
+ }
+ values[j] = dateValues;
+ }
+ }
+
+ final InsertTabletNode insertTabletNode =
+ new InsertTabletNode(
+ PLACEHOLDER_PLAN_NODE_ID,
+ new PartialPath(tablet.deviceId),
+ isTabletAlignedList.get(i),
+ tablet.getSchemas().stream()
+ .map(MeasurementSchema::getMeasurementId)
+ .toArray(String[]::new),
+ tablet.getSchemas().stream()
+ .map(MeasurementSchema::getType)
+ .toArray(TSDataType[]::new),
+ // TODO: cast
+ tablet.getSchemas().toArray(new MeasurementSchema[0]),
+ tablet.timestamps,
+ tablet.bitMaps,
+ tablet.values,
+ tablet.rowSize);
+
+ final int start = 0;
+ final int end = insertTabletNode.getRowCount();
+
+ try {
+ if (insertTabletNode.isAligned()) {
+ memTable.insertAlignedTablet(insertTabletNode, start, end);
+ } else {
+ memTable.insertTablet(insertTabletNode, start, end);
+ }
+ } catch (final org.apache.iotdb.db.exception.WriteProcessException e) {
+ throw new org.apache.iotdb.db.exception.WriteProcessException(e);
+ }
+ }
+
+ final MemTableFlushTask memTableFlushTask =
+ new MemTableFlushTask(memTable, writer, null, null);
+ memTableFlushTask.syncFlushMemTable();
+
+ writer.endFile();
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 770e6c485ea..d3cda241a9f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.metrics.utils.MetricType;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
public class WritingMetrics implements IMetricSet {
private static final WritingMetrics INSTANCE = new WritingMetrics();
@@ -813,6 +814,9 @@ public class WritingMetrics implements IMetricSet {
}
private DataRegionId getDataRegionIdFromStorageGroupStr(String storageGroup)
{
+ if (Objects.isNull(storageGroup)) {
+ return null;
+ }
int idx = storageGroup.lastIndexOf('-');
if (idx == -1) {
return null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 961968d601f..a734112c56e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -153,6 +153,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@@ -2863,6 +2864,9 @@ public class DataRegion implements IDataRegionForQuery {
}
public static Optional<String> getNonSystemDatabaseName(String databaseName)
{
+ if (Objects.isNull(databaseName)) {
+ return Optional.empty();
+ }
if (databaseName.startsWith(SchemaConstant.SYSTEM_DATABASE)) {
return Optional.empty();
}