This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch index-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/index-13 by this push:
new 3b03f881362 [To dev/1.3] Pipe: Fixed the bug that a realtime only pipe
will not transfer historical data when it is altered to a historical pipe
(#17223)
3b03f881362 is described below
commit 3b03f881362971c0975c2813f2612987b2bb7335
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 27 15:58:54 2026 +0800
[To dev/1.3] Pipe: Fixed the bug that a realtime only pipe will not
transfer historical data when it is altered to a historical pipe (#17223)
---
.../iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java | 45 ++++++++++++++++++++++
.../impl/pipe/task/AlterPipeProcedureV2.java | 8 +++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
index 3e4200daf32..b9644159386 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeAlterIT.java
@@ -534,4 +534,49 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualAutoIT {
TestUtils.assertDataEventuallyOnEnv(
receiverEnv, "count timeSeries", "count(timeseries),",
Collections.singleton("3,"));
}
+
+ @Test
+ public void testAlterPipeRealtime() {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Insert data on sender
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2),
(2000, 3), (2500, 4), (3000, 5)",
+ "flush"),
+ null);
+
+ // Create pipe
+ final String sql =
+ String.format(
+ "create pipe a2b with source ('history.enable'='false') with sink
('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "count timeSeries root.db.**",
+ "count(timeseries),",
+ Collections.singleton("0,"));
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify source
('history.enable'='true')");
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "count timeSeries root.db.**",
+ "count(timeseries),",
+ Collections.singleton("1,"));
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 5a7b9b8a5b2..dcebb0b3d33 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
@@ -169,7 +170,12 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
updatedConsensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
new PipeTaskMeta(
- currentPipeTaskMeta.getProgressIndex(),
+ PipeTaskAgent.isRealtimeOnlyPipe(
+
currentPipeStaticMeta.getExtractorParameters())
+ && !PipeTaskAgent.isRealtimeOnlyPipe(
+
updatedPipeStaticMeta.getExtractorParameters())
+ ? MinimumProgressIndex.INSTANCE
+ : currentPipeTaskMeta.getProgressIndex(),
PipeTaskMeta.isNewlyAdded(currentPipeTaskMeta.getLeaderNodeId())
&& !(!PipeTaskAgent.isHistoryOnlyPipe(
currentPipeStaticMeta.getExtractorParameters())