This is an automated email from the ASF dual-hosted git repository. JackieTien97 pushed a commit to branch rc/2.0.10 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c7ba5da546a3e18816d974c765769200094eaa30 Author: Caideyipi <[email protected]> AuthorDate: Tue Jun 16 16:14:15 2026 +0800 Fix aggregate write-back output database metadata (#17938) * Fix aggregate write-back output database metadata * Fixed i18n --- .../db/pipe/event/common/row/PipeRowCollector.java | 24 +++++++++---- .../common/tablet/PipeRawTabletEventConverter.java | 42 ++++++++++++++++++---- .../event/common/tablet/PipeTabletCollector.java | 23 +++++++++--- .../processor/aggregate/AggregateProcessor.java | 13 ++++++- .../pipe/event/PipeTabletInsertionEventTest.java | 32 +++++++++++++++++ .../apache/iotdb/commons/i18n/CommonMessages.java | 8 +++++ .../apache/iotdb/commons/i18n/CommonMessages.java | 8 +++++ .../iotdb/commons/partition/DataPartition.java | 18 ++++++---- 8 files changed, 143 insertions(+), 25 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java index 326d9c7d31e..efd442fe428 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java @@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.event.common.row; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.db.i18n.DataNodePipeMessages; -import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils; @@ -56,6 +55,22 @@ public class PipeRowCollector extends PipeRawTabletEventConverter implements Row super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel); } + public PipeRowCollector( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel, + String rawTableModelDataBaseName, + String rawTreeModelDataBaseName) { + super( + pipeTaskMeta, + sourceEvent, + sourceEventDataBase, + isTableModel, + rawTableModelDataBaseName, + rawTreeModelDataBaseName); + } + @Override public void collectRow(Row row) { if (!(row instanceof PipeRow)) { @@ -106,15 +121,12 @@ public class PipeRowCollector extends PipeRawTabletEventConverter implements Row private void collectTabletInsertionEvent() { if (tablet != null) { PipeTabletUtils.compactBitMaps(tablet); - // TODO: non-PipeInsertionEvent sourceEvent is not supported? - final PipeInsertionEvent pipeInsertionEvent = - sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent) sourceEvent) : null; tabletInsertionEventList.add( new PipeRawTabletInsertionEvent( isTableModel, sourceEventDataBaseName, - pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTableModelDataBase(), - pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTreeModelDataBase(), + rawTableModelDataBaseName, + rawTreeModelDataBaseName, tablet, isAligned, sourceEvent == null ? null : sourceEvent.getPipeName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java index 4f387e71883..6a356aeca6d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java @@ -33,19 +33,25 @@ public abstract class PipeRawTabletEventConverter implements DataCollector { protected boolean isAligned = false; protected final PipeTaskMeta pipeTaskMeta; // Used to report progress protected final EnrichedEvent sourceEvent; // Used to report progress - protected final String sourceEventDataBaseName; - protected final Boolean isTableModel; + protected String sourceEventDataBaseName; + protected Boolean isTableModel; + protected String rawTableModelDataBaseName; + protected String rawTreeModelDataBaseName; public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent sourceEvent) { this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; if (sourceEvent instanceof PipeInsertionEvent) { - sourceEventDataBaseName = - ((PipeInsertionEvent) sourceEvent).getSourceDatabaseNameFromDataRegion(); - isTableModel = ((PipeInsertionEvent) sourceEvent).getRawIsTableModelEvent(); + final PipeInsertionEvent pipeInsertionEvent = (PipeInsertionEvent) sourceEvent; + sourceEventDataBaseName = pipeInsertionEvent.getSourceDatabaseNameFromDataRegion(); + isTableModel = pipeInsertionEvent.getRawIsTableModelEvent(); + rawTableModelDataBaseName = pipeInsertionEvent.getRawTableModelDataBase(); + rawTreeModelDataBaseName = pipeInsertionEvent.getRawTreeModelDataBase(); } else { sourceEventDataBaseName = null; isTableModel = null; + rawTableModelDataBaseName = null; + rawTreeModelDataBaseName = null; } } @@ -54,12 +60,34 @@ public abstract class PipeRawTabletEventConverter implements DataCollector { EnrichedEvent sourceEvent, String sourceEventDataBase, Boolean isTableModel) { - this.pipeTaskMeta = pipeTaskMeta; - this.sourceEvent = sourceEvent; + this(pipeTaskMeta, sourceEvent); this.sourceEventDataBaseName = sourceEventDataBase; this.isTableModel = isTableModel; } + public PipeRawTabletEventConverter( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel, + String rawTableModelDataBaseName, + String rawTreeModelDataBaseName) { + this(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel); + this.rawTableModelDataBaseName = rawTableModelDataBaseName; + this.rawTreeModelDataBaseName = rawTreeModelDataBaseName; + } + + public void resetDatabaseInfo( + final String sourceEventDataBaseName, + final Boolean isTableModel, + final String rawTableModelDataBaseName, + final String rawTreeModelDataBaseName) { + this.sourceEventDataBaseName = sourceEventDataBaseName; + this.isTableModel = isTableModel; + this.rawTableModelDataBaseName = rawTableModelDataBaseName; + this.rawTreeModelDataBaseName = rawTreeModelDataBaseName; + } + @Override public List<TabletInsertionEvent> convertToTabletInsertionEvents(final boolean shouldReport) { final int eventListSize = tabletInsertionEventList.size(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java index beacc54e705..b9da59f3111 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java @@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.event.common.tablet; import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; -import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.pipe.api.collector.TabletCollector; import org.apache.tsfile.write.record.Tablet; @@ -40,16 +39,30 @@ public class PipeTabletCollector extends PipeRawTabletEventConverter implements super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel); } + public PipeTabletCollector( + PipeTaskMeta pipeTaskMeta, + EnrichedEvent sourceEvent, + String sourceEventDataBase, + Boolean isTableModel, + String rawTableModelDataBaseName, + String rawTreeModelDataBaseName) { + super( + pipeTaskMeta, + sourceEvent, + sourceEventDataBase, + isTableModel, + rawTableModelDataBaseName, + rawTreeModelDataBaseName); + } + @Override public void collectTablet(final Tablet tablet) { - final PipeInsertionEvent pipeInsertionEvent = - sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent) sourceEvent) : null; tabletInsertionEventList.add( new PipeRawTabletInsertionEvent( isTableModel, sourceEventDataBaseName, - pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTableModelDataBase(), - pipeInsertionEvent == null ? null : pipeInsertionEvent.getRawTreeModelDataBase(), + rawTableModelDataBaseName, + rawTreeModelDataBaseName, tablet, isAligned, sourceEvent == null ? null : sourceEvent.getPipeName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java index b584db153d0..6d27b9f7dd5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java @@ -116,6 +116,7 @@ public class AggregateProcessor implements PipeProcessor { private PipeTaskMeta pipeTaskMeta; private long outputMaxDelayMilliseconds; private long outputMinReportIntervalMilliseconds; + private String outputDatabase; private String outputDatabaseWithPathSeparator; private final Map<String, AggregatedResultOperator> outputName2OperatorMap = new HashMap<>(); @@ -226,7 +227,7 @@ public class AggregateProcessor implements PipeProcessor { PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_KEY, PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_DEFAULT_VALUE) * 1000; - final String outputDatabase = + outputDatabase = parameters.getStringOrDefault( PROCESSOR_OUTPUT_DATABASE_KEY, PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE); outputDatabaseWithPathSeparator = @@ -427,6 +428,8 @@ public class AggregateProcessor implements PipeProcessor { final Row row, final RowCollector rowCollector, final AtomicReference<Exception> exception) { final Map<String, Pair<Long, ByteBuffer>> resultMap = new HashMap<>(); + resetOutputDatabaseForGeneratedEvent(rowCollector); + final long timestamp = row.getTime(); for (int index = 0, size = row.size(); index < size; ++index) { // Do not calculate null values @@ -580,6 +583,7 @@ public class AggregateProcessor implements PipeProcessor { synchronized (stateReference) { final PipeRowCollector rowCollector = new PipeRowCollector(pipeTaskMeta, null, dataBaseName, isTableModel); + resetOutputDatabaseForGeneratedEvent(rowCollector); try { collectWindowOutputs( stateReference.get().forceOutput(), timeSeries, rowCollector); @@ -612,6 +616,13 @@ public class AggregateProcessor implements PipeProcessor { eventCollector.collect(event); } + private void resetOutputDatabaseForGeneratedEvent(final RowCollector rowCollector) { + if (!outputDatabase.isEmpty() && rowCollector instanceof PipeRowCollector) { + ((PipeRowCollector) rowCollector) + .resetDatabaseInfo(outputDatabase, Boolean.FALSE, null, outputDatabase); + } + } + /** * Collect {@link WindowOutput}s of a single timeSeries in one turn. The {@link TSDataType}s shall * be the same because the {@link AggregatedResultOperator}s shall return the same value for the diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index da3dee91caa..052aab50732 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -31,6 +31,8 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow; +import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser; @@ -52,6 +54,7 @@ import org.junit.Test; import java.time.LocalDate; import java.util.Arrays; import java.util.Collections; +import java.util.List; import static org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern.buildUnionPattern; @@ -319,6 +322,35 @@ public class PipeTabletInsertionEventTest { Assert.assertTrue(isAligned4); } + @Test + public void collectRowWithOverriddenTreeDatabaseForTest() { + final PipeRowCollector rowCollector = new PipeRowCollector(null, null, "root.test.sg_0", false); + rowCollector.resetDatabaseInfo("root.userResultDB", false, null, "root.userResultDB"); + + final MeasurementSchema[] outputSchemas = {new MeasurementSchema("avg", TSDataType.INT32)}; + rowCollector.collectRow( + new PipeResetTabletRow( + 0, + "root.userResultDB.d_0.s_1", + false, + outputSchemas, + new long[] {1L}, + new TSDataType[] {TSDataType.INT32}, + new Object[] {new int[] {1}}, + null, + new String[] {"avg"})); + + final List<org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent> events = + rowCollector.convertToTabletInsertionEvents(false); + Assert.assertEquals(1, events.size()); + + final PipeRawTabletInsertionEvent event = (PipeRawTabletInsertionEvent) events.get(0); + Assert.assertEquals("root.userResultDB", event.getSourceDatabaseNameFromDataRegion()); + Assert.assertFalse(event.isTableModelEvent()); + Assert.assertEquals("root.userResultDB", event.getTreeModelDatabaseName()); + Assert.assertEquals("root.userResultDB.d_0.s_1", event.convertToTablet().getDeviceId()); + } + @Test public void convertToTabletSkipsUnnecessaryBitMapsForTest() throws Exception { final BitMap[] bitMaps = new BitMap[schemas.length]; diff --git a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java index 5a2e4196197..f470f876fcf 100644 --- a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -69,6 +69,14 @@ public final class CommonMessages { // --- subscription --- public static final String CONFIG_PRINT = "{}: {}"; + // --- partition --- + public static final String DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED = + "Database %s not exists and failed to create automatically because enable_auto_create_schema is FALSE."; + public static final String DATA_PARTITION_EMPTY = + "Data partition is empty. device: %s, seriesSlot: %s, database: %s"; + public static final String TARGET_REGION_LIST_EMPTY = + "targetRegionList is empty. device: %s, timeSlot: %s"; + // --- concurrent --- public static final String EXCEPTION_IN_THREAD = "Exception in thread {}-{}"; public static final String INTERRUPTED_WHILE_AWAITING = "Interrupted while awaiting condition"; diff --git a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java index a4b72b9e427..dc31b8296e9 100644 --- a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java +++ b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java @@ -68,6 +68,14 @@ public final class CommonMessages { // --- subscription --- public static final String CONFIG_PRINT = "{}:{}"; + // --- partition --- + public static final String DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED = + "Database %s 不存在,且由于 enable_auto_create_schema 为 FALSE,无法自动创建。"; + public static final String DATA_PARTITION_EMPTY = + "数据分区为空。device: %s, seriesSlot: %s, database: %s"; + public static final String TARGET_REGION_LIST_EMPTY = + "targetRegionList 为空。device: %s, timeSlot: %s"; + // --- concurrent --- public static final String EXCEPTION_IN_THREAD = "线程 {}-{} 中发生异常"; public static final String INTERRUPTED_WHILE_AWAITING = "等待条件时被中断"; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java index 100c40eddcc..43085f04987 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java @@ -22,6 +22,7 @@ package org.apache.iotdb.commons.partition; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot; +import org.apache.iotdb.commons.i18n.CommonMessages; import org.apache.iotdb.commons.utils.PathUtils; import org.apache.iotdb.commons.utils.TimePartitionUtils; @@ -221,15 +222,22 @@ public class DataPartition extends Partition { final List<TRegionReplicaSet> dataRegionReplicaSets = new ArrayList<>(); final Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, List<TRegionReplicaSet>>> dataBasePartitionMap = dataPartitionMap.get(databaseName); + if (dataBasePartitionMap == null) { + throw new RuntimeException( + String.format(CommonMessages.DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED, databaseName)); + } final Map<TTimePartitionSlot, List<TRegionReplicaSet>> slotReplicaSetMap = dataBasePartitionMap.get(seriesPartitionSlot); + if (slotReplicaSetMap == null) { + throw new RuntimeException( + String.format( + CommonMessages.DATA_PARTITION_EMPTY, deviceID, seriesPartitionSlot, databaseName)); + } for (final TTimePartitionSlot timePartitionSlot : timePartitionSlotList) { final List<TRegionReplicaSet> targetRegionList = slotReplicaSetMap.get(timePartitionSlot); if (targetRegionList == null || targetRegionList.isEmpty()) { throw new RuntimeException( - String.format( - "targetRegionList is empty. device: %s, timeSlot: %s", - deviceID, timePartitionSlot)); + String.format(CommonMessages.TARGET_REGION_LIST_EMPTY, deviceID, timePartitionSlot)); } else { dataRegionReplicaSets.add(targetRegionList.get(targetRegionList.size() - 1)); } @@ -250,9 +258,7 @@ public class DataPartition extends Partition { databasePartitionMap = dataPartitionMap.get(databaseName); if (databasePartitionMap == null) { throw new RuntimeException( - "Database " - + databaseName - + " not exists and failed to create automatically because enable_auto_create_schema is FALSE."); + String.format(CommonMessages.DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED, databaseName)); } final List<TRegionReplicaSet> regions = databasePartitionMap.get(seriesPartitionSlot).get(timePartitionSlot);
