This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 3f207172056 Fix aggregate write-back output database metadata (#17938)
3f207172056 is described below
commit 3f20717205688be0e3d98b7911bfc4284a1d95d3
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 16:14:15 2026 +0800
Fix aggregate write-back output database metadata (#17938)
* Fix aggregate write-back output database metadata
* Fixed i18n
---
.../db/pipe/event/common/row/PipeRowCollector.java | 24 +++++++++----
.../common/tablet/PipeRawTabletEventConverter.java | 42 ++++++++++++++++++----
.../event/common/tablet/PipeTabletCollector.java | 23 +++++++++---
.../processor/aggregate/AggregateProcessor.java | 13 ++++++-
.../pipe/event/PipeTabletInsertionEventTest.java | 32 +++++++++++++++++
.../apache/iotdb/commons/i18n/CommonMessages.java | 8 +++++
.../apache/iotdb/commons/i18n/CommonMessages.java | 8 +++++
.../iotdb/commons/partition/DataPartition.java | 18 ++++++----
8 files changed, 143 insertions(+), 25 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 326d9c7d31e..efd442fe428 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.event.common.row;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
-import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
@@ -56,6 +55,22 @@ public class PipeRowCollector extends
PipeRawTabletEventConverter implements Row
super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
}
+ public PipeRowCollector(
+ PipeTaskMeta pipeTaskMeta,
+ EnrichedEvent sourceEvent,
+ String sourceEventDataBase,
+ Boolean isTableModel,
+ String rawTableModelDataBaseName,
+ String rawTreeModelDataBaseName) {
+ super(
+ pipeTaskMeta,
+ sourceEvent,
+ sourceEventDataBase,
+ isTableModel,
+ rawTableModelDataBaseName,
+ rawTreeModelDataBaseName);
+ }
+
@Override
public void collectRow(Row row) {
if (!(row instanceof PipeRow)) {
@@ -106,15 +121,12 @@ public class PipeRowCollector extends
PipeRawTabletEventConverter implements Row
private void collectTabletInsertionEvent() {
if (tablet != null) {
PipeTabletUtils.compactBitMaps(tablet);
- // TODO: non-PipeInsertionEvent sourceEvent is not supported?
- final PipeInsertionEvent pipeInsertionEvent =
- sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent)
sourceEvent) : null;
tabletInsertionEventList.add(
new PipeRawTabletInsertionEvent(
isTableModel,
sourceEventDataBaseName,
- pipeInsertionEvent == null ? null :
pipeInsertionEvent.getRawTableModelDataBase(),
- pipeInsertionEvent == null ? null :
pipeInsertionEvent.getRawTreeModelDataBase(),
+ rawTableModelDataBaseName,
+ rawTreeModelDataBaseName,
tablet,
isAligned,
sourceEvent == null ? null : sourceEvent.getPipeName(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
index 4f387e71883..6a356aeca6d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
@@ -33,19 +33,25 @@ public abstract class PipeRawTabletEventConverter
implements DataCollector {
protected boolean isAligned = false;
protected final PipeTaskMeta pipeTaskMeta; // Used to report progress
protected final EnrichedEvent sourceEvent; // Used to report progress
- protected final String sourceEventDataBaseName;
- protected final Boolean isTableModel;
+ protected String sourceEventDataBaseName;
+ protected Boolean isTableModel;
+ protected String rawTableModelDataBaseName;
+ protected String rawTreeModelDataBaseName;
public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent
sourceEvent) {
this.pipeTaskMeta = pipeTaskMeta;
this.sourceEvent = sourceEvent;
if (sourceEvent instanceof PipeInsertionEvent) {
- sourceEventDataBaseName =
- ((PipeInsertionEvent)
sourceEvent).getSourceDatabaseNameFromDataRegion();
- isTableModel = ((PipeInsertionEvent)
sourceEvent).getRawIsTableModelEvent();
+ final PipeInsertionEvent pipeInsertionEvent = (PipeInsertionEvent)
sourceEvent;
+ sourceEventDataBaseName =
pipeInsertionEvent.getSourceDatabaseNameFromDataRegion();
+ isTableModel = pipeInsertionEvent.getRawIsTableModelEvent();
+ rawTableModelDataBaseName =
pipeInsertionEvent.getRawTableModelDataBase();
+ rawTreeModelDataBaseName = pipeInsertionEvent.getRawTreeModelDataBase();
} else {
sourceEventDataBaseName = null;
isTableModel = null;
+ rawTableModelDataBaseName = null;
+ rawTreeModelDataBaseName = null;
}
}
@@ -54,12 +60,34 @@ public abstract class PipeRawTabletEventConverter
implements DataCollector {
EnrichedEvent sourceEvent,
String sourceEventDataBase,
Boolean isTableModel) {
- this.pipeTaskMeta = pipeTaskMeta;
- this.sourceEvent = sourceEvent;
+ this(pipeTaskMeta, sourceEvent);
this.sourceEventDataBaseName = sourceEventDataBase;
this.isTableModel = isTableModel;
}
+ public PipeRawTabletEventConverter(
+ PipeTaskMeta pipeTaskMeta,
+ EnrichedEvent sourceEvent,
+ String sourceEventDataBase,
+ Boolean isTableModel,
+ String rawTableModelDataBaseName,
+ String rawTreeModelDataBaseName) {
+ this(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
+ this.rawTableModelDataBaseName = rawTableModelDataBaseName;
+ this.rawTreeModelDataBaseName = rawTreeModelDataBaseName;
+ }
+
+ public void resetDatabaseInfo(
+ final String sourceEventDataBaseName,
+ final Boolean isTableModel,
+ final String rawTableModelDataBaseName,
+ final String rawTreeModelDataBaseName) {
+ this.sourceEventDataBaseName = sourceEventDataBaseName;
+ this.isTableModel = isTableModel;
+ this.rawTableModelDataBaseName = rawTableModelDataBaseName;
+ this.rawTreeModelDataBaseName = rawTreeModelDataBaseName;
+ }
+
@Override
public List<TabletInsertionEvent> convertToTabletInsertionEvents(final
boolean shouldReport) {
final int eventListSize = tabletInsertionEventList.size();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
index beacc54e705..b9da59f3111 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.pipe.api.collector.TabletCollector;
import org.apache.tsfile.write.record.Tablet;
@@ -40,16 +39,30 @@ public class PipeTabletCollector extends
PipeRawTabletEventConverter implements
super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
}
+ public PipeTabletCollector(
+ PipeTaskMeta pipeTaskMeta,
+ EnrichedEvent sourceEvent,
+ String sourceEventDataBase,
+ Boolean isTableModel,
+ String rawTableModelDataBaseName,
+ String rawTreeModelDataBaseName) {
+ super(
+ pipeTaskMeta,
+ sourceEvent,
+ sourceEventDataBase,
+ isTableModel,
+ rawTableModelDataBaseName,
+ rawTreeModelDataBaseName);
+ }
+
@Override
public void collectTablet(final Tablet tablet) {
- final PipeInsertionEvent pipeInsertionEvent =
- sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent)
sourceEvent) : null;
tabletInsertionEventList.add(
new PipeRawTabletInsertionEvent(
isTableModel,
sourceEventDataBaseName,
- pipeInsertionEvent == null ? null :
pipeInsertionEvent.getRawTableModelDataBase(),
- pipeInsertionEvent == null ? null :
pipeInsertionEvent.getRawTreeModelDataBase(),
+ rawTableModelDataBaseName,
+ rawTreeModelDataBaseName,
tablet,
isAligned,
sourceEvent == null ? null : sourceEvent.getPipeName(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index b584db153d0..6d27b9f7dd5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -116,6 +116,7 @@ public class AggregateProcessor implements PipeProcessor {
private PipeTaskMeta pipeTaskMeta;
private long outputMaxDelayMilliseconds;
private long outputMinReportIntervalMilliseconds;
+ private String outputDatabase;
private String outputDatabaseWithPathSeparator;
private final Map<String, AggregatedResultOperator> outputName2OperatorMap =
new HashMap<>();
@@ -226,7 +227,7 @@ public class AggregateProcessor implements PipeProcessor {
PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_KEY,
PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_DEFAULT_VALUE)
* 1000;
- final String outputDatabase =
+ outputDatabase =
parameters.getStringOrDefault(
PROCESSOR_OUTPUT_DATABASE_KEY,
PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE);
outputDatabaseWithPathSeparator =
@@ -427,6 +428,8 @@ public class AggregateProcessor implements PipeProcessor {
final Row row, final RowCollector rowCollector, final
AtomicReference<Exception> exception) {
final Map<String, Pair<Long, ByteBuffer>> resultMap = new HashMap<>();
+ resetOutputDatabaseForGeneratedEvent(rowCollector);
+
final long timestamp = row.getTime();
for (int index = 0, size = row.size(); index < size; ++index) {
// Do not calculate null values
@@ -580,6 +583,7 @@ public class AggregateProcessor implements PipeProcessor {
synchronized (stateReference) {
final PipeRowCollector rowCollector =
new PipeRowCollector(pipeTaskMeta, null, dataBaseName,
isTableModel);
+ resetOutputDatabaseForGeneratedEvent(rowCollector);
try {
collectWindowOutputs(
stateReference.get().forceOutput(), timeSeries,
rowCollector);
@@ -612,6 +616,13 @@ public class AggregateProcessor implements PipeProcessor {
eventCollector.collect(event);
}
+ private void resetOutputDatabaseForGeneratedEvent(final RowCollector
rowCollector) {
+ if (!outputDatabase.isEmpty() && rowCollector instanceof PipeRowCollector)
{
+ ((PipeRowCollector) rowCollector)
+ .resetDatabaseInfo(outputDatabase, Boolean.FALSE, null,
outputDatabase);
+ }
+ }
+
/**
* Collect {@link WindowOutput}s of a single timeSeries in one turn. The
{@link TSDataType}s shall
* be the same because the {@link AggregatedResultOperator}s shall return
the same value for the
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 6b1f9e3d3d3..1aface542b1 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -31,6 +31,8 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser;
@@ -54,6 +56,7 @@ import org.junit.Test;
import java.time.LocalDate;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import static
org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern.buildUnionPattern;
@@ -321,6 +324,35 @@ public class PipeTabletInsertionEventTest {
Assert.assertTrue(isAligned4);
}
+ @Test
+ public void collectRowWithOverriddenTreeDatabaseForTest() {
+ final PipeRowCollector rowCollector = new PipeRowCollector(null, null,
"root.test.sg_0", false);
+ rowCollector.resetDatabaseInfo("root.userResultDB", false, null,
"root.userResultDB");
+
+ final MeasurementSchema[] outputSchemas = {new MeasurementSchema("avg",
TSDataType.INT32)};
+ rowCollector.collectRow(
+ new PipeResetTabletRow(
+ 0,
+ "root.userResultDB.d_0.s_1",
+ false,
+ outputSchemas,
+ new long[] {1L},
+ new TSDataType[] {TSDataType.INT32},
+ new Object[] {new int[] {1}},
+ null,
+ new String[] {"avg"}));
+
+ final
List<org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent> events
=
+ rowCollector.convertToTabletInsertionEvents(false);
+ Assert.assertEquals(1, events.size());
+
+ final PipeRawTabletInsertionEvent event = (PipeRawTabletInsertionEvent)
events.get(0);
+ Assert.assertEquals("root.userResultDB",
event.getSourceDatabaseNameFromDataRegion());
+ Assert.assertFalse(event.isTableModelEvent());
+ Assert.assertEquals("root.userResultDB", event.getTreeModelDatabaseName());
+ Assert.assertEquals("root.userResultDB.d_0.s_1",
event.convertToTablet().getDeviceId());
+ }
+
@Test
public void convertToTabletSkipsUnnecessaryBitMapsForTest() throws Exception
{
final BitMap[] bitMaps = new BitMap[schemas.length];
diff --git
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
index 5a2e4196197..f470f876fcf 100644
---
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
+++
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
@@ -69,6 +69,14 @@ public final class CommonMessages {
// --- subscription ---
public static final String CONFIG_PRINT = "{}: {}";
+ // --- partition ---
+ public static final String DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED =
+ "Database %s not exists and failed to create automatically because
enable_auto_create_schema is FALSE.";
+ public static final String DATA_PARTITION_EMPTY =
+ "Data partition is empty. device: %s, seriesSlot: %s, database: %s";
+ public static final String TARGET_REGION_LIST_EMPTY =
+ "targetRegionList is empty. device: %s, timeSlot: %s";
+
// --- concurrent ---
public static final String EXCEPTION_IN_THREAD = "Exception in thread {}-{}";
public static final String INTERRUPTED_WHILE_AWAITING = "Interrupted while
awaiting condition";
diff --git
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
index a4b72b9e427..dc31b8296e9 100644
---
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
+++
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
@@ -68,6 +68,14 @@ public final class CommonMessages {
// --- subscription ---
public static final String CONFIG_PRINT = "{}:{}";
+ // --- partition ---
+ public static final String DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED =
+ "Database %s 不存在,且由于 enable_auto_create_schema 为 FALSE,无法自动创建。";
+ public static final String DATA_PARTITION_EMPTY =
+ "数据分区为空。device: %s, seriesSlot: %s, database: %s";
+ public static final String TARGET_REGION_LIST_EMPTY =
+ "targetRegionList 为空。device: %s, timeSlot: %s";
+
// --- concurrent ---
public static final String EXCEPTION_IN_THREAD = "线程 {}-{} 中发生异常";
public static final String INTERRUPTED_WHILE_AWAITING = "等待条件时被中断";
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 100c40eddcc..43085f04987 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.i18n.CommonMessages;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
@@ -221,15 +222,22 @@ public class DataPartition extends Partition {
final List<TRegionReplicaSet> dataRegionReplicaSets = new ArrayList<>();
final Map<TSeriesPartitionSlot, Map<TTimePartitionSlot,
List<TRegionReplicaSet>>>
dataBasePartitionMap = dataPartitionMap.get(databaseName);
+ if (dataBasePartitionMap == null) {
+ throw new RuntimeException(
+
String.format(CommonMessages.DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED,
databaseName));
+ }
final Map<TTimePartitionSlot, List<TRegionReplicaSet>> slotReplicaSetMap =
dataBasePartitionMap.get(seriesPartitionSlot);
+ if (slotReplicaSetMap == null) {
+ throw new RuntimeException(
+ String.format(
+ CommonMessages.DATA_PARTITION_EMPTY, deviceID,
seriesPartitionSlot, databaseName));
+ }
for (final TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
final List<TRegionReplicaSet> targetRegionList =
slotReplicaSetMap.get(timePartitionSlot);
if (targetRegionList == null || targetRegionList.isEmpty()) {
throw new RuntimeException(
- String.format(
- "targetRegionList is empty. device: %s, timeSlot: %s",
- deviceID, timePartitionSlot));
+ String.format(CommonMessages.TARGET_REGION_LIST_EMPTY, deviceID,
timePartitionSlot));
} else {
dataRegionReplicaSets.add(targetRegionList.get(targetRegionList.size()
- 1));
}
@@ -250,9 +258,7 @@ public class DataPartition extends Partition {
databasePartitionMap = dataPartitionMap.get(databaseName);
if (databasePartitionMap == null) {
throw new RuntimeException(
- "Database "
- + databaseName
- + " not exists and failed to create automatically because
enable_auto_create_schema is FALSE.");
+
String.format(CommonMessages.DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED,
databaseName));
}
final List<TRegionReplicaSet> regions =
databasePartitionMap.get(seriesPartitionSlot).get(timePartitionSlot);