This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f207172056 Fix aggregate write-back output database metadata (#17938)
3f207172056 is described below

commit 3f20717205688be0e3d98b7911bfc4284a1d95d3
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 16 16:14:15 2026 +0800

    Fix aggregate write-back output database metadata (#17938)
    
    * Fix aggregate write-back output database metadata
    
    * Fixed i18n
---
 .../db/pipe/event/common/row/PipeRowCollector.java | 24 +++++++++----
 .../common/tablet/PipeRawTabletEventConverter.java | 42 ++++++++++++++++++----
 .../event/common/tablet/PipeTabletCollector.java   | 23 +++++++++---
 .../processor/aggregate/AggregateProcessor.java    | 13 ++++++-
 .../pipe/event/PipeTabletInsertionEventTest.java   | 32 +++++++++++++++++
 .../apache/iotdb/commons/i18n/CommonMessages.java  |  8 +++++
 .../apache/iotdb/commons/i18n/CommonMessages.java  |  8 +++++
 .../iotdb/commons/partition/DataPartition.java     | 18 ++++++----
 8 files changed, 143 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 326d9c7d31e..efd442fe428 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.db.pipe.event.common.row;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.db.i18n.DataNodePipeMessages;
-import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
@@ -56,6 +55,22 @@ public class PipeRowCollector extends 
PipeRawTabletEventConverter implements Row
     super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
   }
 
+  public PipeRowCollector(
+      PipeTaskMeta pipeTaskMeta,
+      EnrichedEvent sourceEvent,
+      String sourceEventDataBase,
+      Boolean isTableModel,
+      String rawTableModelDataBaseName,
+      String rawTreeModelDataBaseName) {
+    super(
+        pipeTaskMeta,
+        sourceEvent,
+        sourceEventDataBase,
+        isTableModel,
+        rawTableModelDataBaseName,
+        rawTreeModelDataBaseName);
+  }
+
   @Override
   public void collectRow(Row row) {
     if (!(row instanceof PipeRow)) {
@@ -106,15 +121,12 @@ public class PipeRowCollector extends 
PipeRawTabletEventConverter implements Row
   private void collectTabletInsertionEvent() {
     if (tablet != null) {
       PipeTabletUtils.compactBitMaps(tablet);
-      // TODO: non-PipeInsertionEvent sourceEvent is not supported?
-      final PipeInsertionEvent pipeInsertionEvent =
-          sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent) 
sourceEvent) : null;
       tabletInsertionEventList.add(
           new PipeRawTabletInsertionEvent(
               isTableModel,
               sourceEventDataBaseName,
-              pipeInsertionEvent == null ? null : 
pipeInsertionEvent.getRawTableModelDataBase(),
-              pipeInsertionEvent == null ? null : 
pipeInsertionEvent.getRawTreeModelDataBase(),
+              rawTableModelDataBaseName,
+              rawTreeModelDataBaseName,
               tablet,
               isAligned,
               sourceEvent == null ? null : sourceEvent.getPipeName(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
index 4f387e71883..6a356aeca6d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletEventConverter.java
@@ -33,19 +33,25 @@ public abstract class PipeRawTabletEventConverter 
implements DataCollector {
   protected boolean isAligned = false;
   protected final PipeTaskMeta pipeTaskMeta; // Used to report progress
   protected final EnrichedEvent sourceEvent; // Used to report progress
-  protected final String sourceEventDataBaseName;
-  protected final Boolean isTableModel;
+  protected String sourceEventDataBaseName;
+  protected Boolean isTableModel;
+  protected String rawTableModelDataBaseName;
+  protected String rawTreeModelDataBaseName;
 
   public PipeRawTabletEventConverter(PipeTaskMeta pipeTaskMeta, EnrichedEvent 
sourceEvent) {
     this.pipeTaskMeta = pipeTaskMeta;
     this.sourceEvent = sourceEvent;
     if (sourceEvent instanceof PipeInsertionEvent) {
-      sourceEventDataBaseName =
-          ((PipeInsertionEvent) 
sourceEvent).getSourceDatabaseNameFromDataRegion();
-      isTableModel = ((PipeInsertionEvent) 
sourceEvent).getRawIsTableModelEvent();
+      final PipeInsertionEvent pipeInsertionEvent = (PipeInsertionEvent) 
sourceEvent;
+      sourceEventDataBaseName = 
pipeInsertionEvent.getSourceDatabaseNameFromDataRegion();
+      isTableModel = pipeInsertionEvent.getRawIsTableModelEvent();
+      rawTableModelDataBaseName = 
pipeInsertionEvent.getRawTableModelDataBase();
+      rawTreeModelDataBaseName = pipeInsertionEvent.getRawTreeModelDataBase();
     } else {
       sourceEventDataBaseName = null;
       isTableModel = null;
+      rawTableModelDataBaseName = null;
+      rawTreeModelDataBaseName = null;
     }
   }
 
@@ -54,12 +60,34 @@ public abstract class PipeRawTabletEventConverter 
implements DataCollector {
       EnrichedEvent sourceEvent,
       String sourceEventDataBase,
       Boolean isTableModel) {
-    this.pipeTaskMeta = pipeTaskMeta;
-    this.sourceEvent = sourceEvent;
+    this(pipeTaskMeta, sourceEvent);
     this.sourceEventDataBaseName = sourceEventDataBase;
     this.isTableModel = isTableModel;
   }
 
+  public PipeRawTabletEventConverter(
+      PipeTaskMeta pipeTaskMeta,
+      EnrichedEvent sourceEvent,
+      String sourceEventDataBase,
+      Boolean isTableModel,
+      String rawTableModelDataBaseName,
+      String rawTreeModelDataBaseName) {
+    this(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
+    this.rawTableModelDataBaseName = rawTableModelDataBaseName;
+    this.rawTreeModelDataBaseName = rawTreeModelDataBaseName;
+  }
+
+  public void resetDatabaseInfo(
+      final String sourceEventDataBaseName,
+      final Boolean isTableModel,
+      final String rawTableModelDataBaseName,
+      final String rawTreeModelDataBaseName) {
+    this.sourceEventDataBaseName = sourceEventDataBaseName;
+    this.isTableModel = isTableModel;
+    this.rawTableModelDataBaseName = rawTableModelDataBaseName;
+    this.rawTreeModelDataBaseName = rawTreeModelDataBaseName;
+  }
+
   @Override
   public List<TabletInsertionEvent> convertToTabletInsertionEvents(final 
boolean shouldReport) {
     final int eventListSize = tabletInsertionEventList.size();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
index beacc54e705..b9da59f3111 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletCollector.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.pipe.event.common.tablet;
 
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
-import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
 import org.apache.iotdb.pipe.api.collector.TabletCollector;
 
 import org.apache.tsfile.write.record.Tablet;
@@ -40,16 +39,30 @@ public class PipeTabletCollector extends 
PipeRawTabletEventConverter implements
     super(pipeTaskMeta, sourceEvent, sourceEventDataBase, isTableModel);
   }
 
+  public PipeTabletCollector(
+      PipeTaskMeta pipeTaskMeta,
+      EnrichedEvent sourceEvent,
+      String sourceEventDataBase,
+      Boolean isTableModel,
+      String rawTableModelDataBaseName,
+      String rawTreeModelDataBaseName) {
+    super(
+        pipeTaskMeta,
+        sourceEvent,
+        sourceEventDataBase,
+        isTableModel,
+        rawTableModelDataBaseName,
+        rawTreeModelDataBaseName);
+  }
+
   @Override
   public void collectTablet(final Tablet tablet) {
-    final PipeInsertionEvent pipeInsertionEvent =
-        sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent) 
sourceEvent) : null;
     tabletInsertionEventList.add(
         new PipeRawTabletInsertionEvent(
             isTableModel,
             sourceEventDataBaseName,
-            pipeInsertionEvent == null ? null : 
pipeInsertionEvent.getRawTableModelDataBase(),
-            pipeInsertionEvent == null ? null : 
pipeInsertionEvent.getRawTreeModelDataBase(),
+            rawTableModelDataBaseName,
+            rawTreeModelDataBaseName,
             tablet,
             isAligned,
             sourceEvent == null ? null : sourceEvent.getPipeName(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
index b584db153d0..6d27b9f7dd5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java
@@ -116,6 +116,7 @@ public class AggregateProcessor implements PipeProcessor {
   private PipeTaskMeta pipeTaskMeta;
   private long outputMaxDelayMilliseconds;
   private long outputMinReportIntervalMilliseconds;
+  private String outputDatabase;
   private String outputDatabaseWithPathSeparator;
 
   private final Map<String, AggregatedResultOperator> outputName2OperatorMap = 
new HashMap<>();
@@ -226,7 +227,7 @@ public class AggregateProcessor implements PipeProcessor {
                 PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_KEY,
                 PROCESSOR_OUTPUT_MIN_REPORT_INTERVAL_SECONDS_DEFAULT_VALUE)
             * 1000;
-    final String outputDatabase =
+    outputDatabase =
         parameters.getStringOrDefault(
             PROCESSOR_OUTPUT_DATABASE_KEY, 
PROCESSOR_OUTPUT_DATABASE_DEFAULT_VALUE);
     outputDatabaseWithPathSeparator =
@@ -427,6 +428,8 @@ public class AggregateProcessor implements PipeProcessor {
       final Row row, final RowCollector rowCollector, final 
AtomicReference<Exception> exception) {
     final Map<String, Pair<Long, ByteBuffer>> resultMap = new HashMap<>();
 
+    resetOutputDatabaseForGeneratedEvent(rowCollector);
+
     final long timestamp = row.getTime();
     for (int index = 0, size = row.size(); index < size; ++index) {
       // Do not calculate null values
@@ -580,6 +583,7 @@ public class AggregateProcessor implements PipeProcessor {
                 synchronized (stateReference) {
                   final PipeRowCollector rowCollector =
                       new PipeRowCollector(pipeTaskMeta, null, dataBaseName, 
isTableModel);
+                  resetOutputDatabaseForGeneratedEvent(rowCollector);
                   try {
                     collectWindowOutputs(
                         stateReference.get().forceOutput(), timeSeries, 
rowCollector);
@@ -612,6 +616,13 @@ public class AggregateProcessor implements PipeProcessor {
     eventCollector.collect(event);
   }
 
+  private void resetOutputDatabaseForGeneratedEvent(final RowCollector 
rowCollector) {
+    if (!outputDatabase.isEmpty() && rowCollector instanceof PipeRowCollector) 
{
+      ((PipeRowCollector) rowCollector)
+          .resetDatabaseInfo(outputDatabase, Boolean.FALSE, null, 
outputDatabase);
+    }
+  }
+
   /**
    * Collect {@link WindowOutput}s of a single timeSeries in one turn. The 
{@link TSDataType}s shall
    * be the same because the {@link AggregatedResultOperator}s shall return 
the same value for the
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 6b1f9e3d3d3..1aface542b1 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -31,6 +31,8 @@ import 
org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
 import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.pipe.event.common.row.PipeResetTabletRow;
+import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser;
@@ -54,6 +56,7 @@ import org.junit.Test;
 import java.time.LocalDate;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static 
org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern.buildUnionPattern;
 
@@ -321,6 +324,35 @@ public class PipeTabletInsertionEventTest {
     Assert.assertTrue(isAligned4);
   }
 
+  @Test
+  public void collectRowWithOverriddenTreeDatabaseForTest() {
+    final PipeRowCollector rowCollector = new PipeRowCollector(null, null, 
"root.test.sg_0", false);
+    rowCollector.resetDatabaseInfo("root.userResultDB", false, null, 
"root.userResultDB");
+
+    final MeasurementSchema[] outputSchemas = {new MeasurementSchema("avg", 
TSDataType.INT32)};
+    rowCollector.collectRow(
+        new PipeResetTabletRow(
+            0,
+            "root.userResultDB.d_0.s_1",
+            false,
+            outputSchemas,
+            new long[] {1L},
+            new TSDataType[] {TSDataType.INT32},
+            new Object[] {new int[] {1}},
+            null,
+            new String[] {"avg"}));
+
+    final 
List<org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent> events 
=
+        rowCollector.convertToTabletInsertionEvents(false);
+    Assert.assertEquals(1, events.size());
+
+    final PipeRawTabletInsertionEvent event = (PipeRawTabletInsertionEvent) 
events.get(0);
+    Assert.assertEquals("root.userResultDB", 
event.getSourceDatabaseNameFromDataRegion());
+    Assert.assertFalse(event.isTableModelEvent());
+    Assert.assertEquals("root.userResultDB", event.getTreeModelDatabaseName());
+    Assert.assertEquals("root.userResultDB.d_0.s_1", 
event.convertToTablet().getDeviceId());
+  }
+
   @Test
   public void convertToTabletSkipsUnnecessaryBitMapsForTest() throws Exception 
{
     final BitMap[] bitMaps = new BitMap[schemas.length];
diff --git 
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
 
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
index 5a2e4196197..f470f876fcf 100644
--- 
a/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
+++ 
b/iotdb-core/node-commons/src/main/i18n/en/org/apache/iotdb/commons/i18n/CommonMessages.java
@@ -69,6 +69,14 @@ public final class CommonMessages {
   // --- subscription ---
   public static final String CONFIG_PRINT = "{}: {}";
 
+  // --- partition ---
+  public static final String DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED =
+      "Database %s not exists and failed to create automatically because 
enable_auto_create_schema is FALSE.";
+  public static final String DATA_PARTITION_EMPTY =
+      "Data partition is empty. device: %s, seriesSlot: %s, database: %s";
+  public static final String TARGET_REGION_LIST_EMPTY =
+      "targetRegionList is empty. device: %s, timeSlot: %s";
+
   // --- concurrent ---
   public static final String EXCEPTION_IN_THREAD = "Exception in thread {}-{}";
   public static final String INTERRUPTED_WHILE_AWAITING = "Interrupted while 
awaiting condition";
diff --git 
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
 
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
index a4b72b9e427..dc31b8296e9 100644
--- 
a/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
+++ 
b/iotdb-core/node-commons/src/main/i18n/zh/org/apache/iotdb/commons/i18n/CommonMessages.java
@@ -68,6 +68,14 @@ public final class CommonMessages {
   // --- subscription ---
   public static final String CONFIG_PRINT = "{}:{}";
 
+  // --- partition ---
+  public static final String DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED =
+      "Database %s 不存在,且由于 enable_auto_create_schema 为 FALSE,无法自动创建。";
+  public static final String DATA_PARTITION_EMPTY =
+      "数据分区为空。device: %s, seriesSlot: %s, database: %s";
+  public static final String TARGET_REGION_LIST_EMPTY =
+      "targetRegionList 为空。device: %s, timeSlot: %s";
+
   // --- concurrent ---
   public static final String EXCEPTION_IN_THREAD = "线程 {}-{} 中发生异常";
   public static final String INTERRUPTED_WHILE_AWAITING = "等待条件时被中断";
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
index 100c40eddcc..43085f04987 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartition.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.commons.partition;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
+import org.apache.iotdb.commons.i18n.CommonMessages;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
 
@@ -221,15 +222,22 @@ public class DataPartition extends Partition {
     final List<TRegionReplicaSet> dataRegionReplicaSets = new ArrayList<>();
     final Map<TSeriesPartitionSlot, Map<TTimePartitionSlot, 
List<TRegionReplicaSet>>>
         dataBasePartitionMap = dataPartitionMap.get(databaseName);
+    if (dataBasePartitionMap == null) {
+      throw new RuntimeException(
+          
String.format(CommonMessages.DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED, 
databaseName));
+    }
     final Map<TTimePartitionSlot, List<TRegionReplicaSet>> slotReplicaSetMap =
         dataBasePartitionMap.get(seriesPartitionSlot);
+    if (slotReplicaSetMap == null) {
+      throw new RuntimeException(
+          String.format(
+              CommonMessages.DATA_PARTITION_EMPTY, deviceID, 
seriesPartitionSlot, databaseName));
+    }
     for (final TTimePartitionSlot timePartitionSlot : timePartitionSlotList) {
       final List<TRegionReplicaSet> targetRegionList = 
slotReplicaSetMap.get(timePartitionSlot);
       if (targetRegionList == null || targetRegionList.isEmpty()) {
         throw new RuntimeException(
-            String.format(
-                "targetRegionList is empty. device: %s, timeSlot: %s",
-                deviceID, timePartitionSlot));
+            String.format(CommonMessages.TARGET_REGION_LIST_EMPTY, deviceID, 
timePartitionSlot));
       } else {
         dataRegionReplicaSets.add(targetRegionList.get(targetRegionList.size() 
- 1));
       }
@@ -250,9 +258,7 @@ public class DataPartition extends Partition {
         databasePartitionMap = dataPartitionMap.get(databaseName);
     if (databasePartitionMap == null) {
       throw new RuntimeException(
-          "Database "
-              + databaseName
-              + " not exists and failed to create automatically because 
enable_auto_create_schema is FALSE.");
+          
String.format(CommonMessages.DATABASE_NOT_EXISTS_AND_AUTO_CREATE_DISABLED, 
databaseName));
     }
     final List<TRegionReplicaSet> regions =
         databasePartitionMap.get(seriesPartitionSlot).get(timePartitionSlot);

Reply via email to