This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a434fa7c237 Pipe: Fixed the bug that a realtime only pipe will not
transfer historical data when it is altered to a historical pipe (#17223)
a434fa7c237 is described below
commit a434fa7c2375eb9b03dbb5b94a537dc8aa1a2652
Author: Caideyipi <[email protected]>
AuthorDate: Fri Feb 27 15:41:47 2026 +0800
Pipe: Fixed the bug that a realtime only pipe will not transfer historical
data when it is altered to a historical pipe (#17223)
* try-fix-index
* fix
* pi-fix
---
.../treemodel/auto/basic/IoTDBPipeAlterIT.java | 45 ++++++++++++++++++++++
.../impl/pipe/task/AlterPipeProcedureV2.java | 8 +++-
2 files changed, 52 insertions(+), 1 deletion(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAlterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAlterIT.java
index 43fdf4ee5e5..d32bc33c956 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAlterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeAlterIT.java
@@ -570,4 +570,49 @@ public class IoTDBPipeAlterIT extends
AbstractPipeDualTreeModelAutoIT {
"count(timeseries),",
Collections.singleton("3,"));
}
+
+ @Test
+ public void testAlterPipeRealtime() {
+ final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);
+
+ // Insert data on sender
+ TestUtils.executeNonQueries(
+ senderEnv,
+ Arrays.asList(
+ "insert into root.db.d1 (time, at1) values (1000, 1), (1500, 2),
(2000, 3), (2500, 4), (3000, 5)",
+ "flush"),
+ null);
+
+ // Create pipe
+ final String sql =
+ String.format(
+ "create pipe a2b with source ('history.enable'='false') with sink
('node-urls'='%s')",
+ receiverDataNode.getIpAndPortString());
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(sql);
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ TestUtils.assertDataAlwaysOnEnv(
+ receiverEnv,
+ "count timeSeries root.db.**",
+ "count(timeseries),",
+ Collections.singleton("0,"));
+
+ try (final Connection connection = senderEnv.getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute("alter pipe a2b modify source
('history.enable'='true')");
+ } catch (final SQLException e) {
+ fail(e.getMessage());
+ }
+
+ TestUtils.assertDataEventuallyOnEnv(
+ receiverEnv,
+ "count timeSeries root.db.**",
+ "count(timeseries),",
+ Collections.singleton("1,"));
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
index 7b74b9dddc5..116c15dde22 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/pipe/task/AlterPipeProcedureV2.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.confignode.procedure.impl.pipe.task;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
import org.apache.iotdb.commons.pipe.agent.task.PipeTaskAgent;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeMeta;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeRuntimeMeta;
@@ -209,7 +210,12 @@ public class AlterPipeProcedureV2 extends
AbstractOperatePipeProcedureV2 {
updatedConsensusGroupIdToTaskMetaMap.put(
regionGroupId.getId(),
new PipeTaskMeta(
- currentPipeTaskMeta.getProgressIndex(),
+ PipeTaskAgent.isRealtimeOnlyPipe(
+
currentPipeStaticMeta.getSourceParameters())
+ && !PipeTaskAgent.isRealtimeOnlyPipe(
+
updatedPipeStaticMeta.getSourceParameters())
+ ? MinimumProgressIndex.INSTANCE
+ : currentPipeTaskMeta.getProgressIndex(),
PipeTaskMeta.isNewlyAdded(currentPipeTaskMeta.getLeaderNodeId())
&& !(!PipeTaskAgent.isHistoryOnlyPipe(
currentPipeStaticMeta.getSourceParameters())