This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e55f27ff [action] Support Reset next snapshot id of consumer id. 
(#1586)
9e55f27ff is described below

commit 9e55f27ff08c7a7f5dd17ad254467fd7ceeaaec2
Author: Kerwin <[email protected]>
AuthorDate: Mon Jul 24 18:10:04 2023 +0800

    [action] Support Reset next snapshot id of consumer id. (#1586)
---
 docs/content/how-to/querying-tables.md             | 28 +++++++
 .../apache/paimon/consumer/ConsumerManager.java    |  2 +-
 .../table/source/InnerStreamTableScanImpl.java     |  2 +-
 .../paimon/consumer/ConsumerManagerTest.java       | 10 +--
 .../paimon/flink/action/ResetConsumerAction.java   | 52 ++++++++++++
 .../flink/action/ResetConsumerActionFactory.java   | 70 ++++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |  3 +
 .../paimon/flink/action/ConsumerActionITCase.java  | 92 ++++++++++++++++++++++
 8 files changed, 252 insertions(+), 7 deletions(-)

diff --git a/docs/content/how-to/querying-tables.md 
b/docs/content/how-to/querying-tables.md
index a211cab38..ccd497c8a 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -269,6 +269,34 @@ NOTE: The consumer will prevent expiration of the 
snapshot. You can specify 'con
 lifetime of consumers.
 {{< /hint >}}
 
+You can reset a consumer with a given consumer ID and next snapshot ID.
+
+{{< hint info >}}
+First, you need to stop the streaming task using this consumer ID, and then 
execute the reset consumer action job.
+{{< /hint >}}
+
+{{< tabs "reset-consumer" >}}
+
+{{< tab "Flink" >}}
+
+Run the following command:
+
+```bash
+<FLINK_HOME>/bin/flink run \
+    /path/to/paimon-flink-action-{{< version >}}.jar \
+    reset-consumer \
+    --warehouse <warehouse-path> \
+    --database <database-name> \ 
+    --table <table-name> \
+    --consumer-id <consumer-id> \
+    --next-snapshot <next-snapshot-id> \
+    [--catalog-conf <paimon-catalog-conf> [--catalog-conf 
<paimon-catalog-conf> ...]]
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
 ## Query Optimization
 
 {{< label Batch >}}{{< label Streaming >}}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java 
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index 247f4372e..bf24dc753 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -58,7 +58,7 @@ public class ConsumerManager implements Serializable {
         return Consumer.fromPath(fileIO, consumerPath(consumerId));
     }
 
-    public void recordConsumer(String consumerId, Consumer consumer) {
+    public void resetConsumer(String consumerId, Consumer consumer) {
         try (PositionOutputStream out = 
fileIO.newOutputStream(consumerPath(consumerId), true)) {
             OutputStreamWriter writer = new OutputStreamWriter(out, 
StandardCharsets.UTF_8);
             writer.write(consumer.toJson());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 3040ee5b7..c30871378 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -238,7 +238,7 @@ public class InnerStreamTableScanImpl extends 
AbstractInnerTableScan
 
         String consumerId = options.consumerId();
         if (consumerId != null) {
-            snapshotReader.consumerManager().recordConsumer(consumerId, new 
Consumer(nextSnapshot));
+            snapshotReader.consumerManager().resetConsumer(consumerId, new 
Consumer(nextSnapshot));
         }
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
index cab65809d..a556a14f0 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
@@ -53,11 +53,11 @@ public class ConsumerManagerTest {
 
         assertThat(manager.minNextSnapshot()).isEmpty();
 
-        manager.recordConsumer("id1", new Consumer(5));
+        manager.resetConsumer("id1", new Consumer(5));
         consumer = manager.consumer("id1");
         assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(5L);
 
-        manager.recordConsumer("id2", new Consumer(8));
+        manager.resetConsumer("id2", new Consumer(8));
         consumer = manager.consumer("id2");
         assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(8L);
 
@@ -66,11 +66,11 @@ public class ConsumerManagerTest {
 
     @Test
     public void testExpire() throws Exception {
-        manager.recordConsumer("id1", new Consumer(1));
+        manager.resetConsumer("id1", new Consumer(1));
         Thread.sleep(1000);
         LocalDateTime expireDateTime = 
DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
         Thread.sleep(1000);
-        manager.recordConsumer("id2", new Consumer(2));
+        manager.resetConsumer("id2", new Consumer(2));
 
         // check expire
         manager.expire(expireDateTime);
@@ -80,7 +80,7 @@ public class ConsumerManagerTest {
         // check last modification
         expireDateTime = 
DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
         Thread.sleep(1000);
-        manager.recordConsumer("id2", new Consumer(3));
+        manager.resetConsumer("id2", new Consumer(3));
         manager.expire(expireDateTime);
         
assertThat(manager.consumer("id2")).map(Consumer::nextSnapshot).get().isEqualTo(3L);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
new file mode 100644
index 000000000..8a6507fbb
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.util.Map;
+
+/** Reset consumer action for Flink. */
+public class ResetConsumerAction extends TableActionBase {
+
+    private final String consumerId;
+    private final long nextSnapshotId;
+
+    protected ResetConsumerAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig,
+            String consumerId,
+            long nextSnapshotId) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+        this.consumerId = consumerId;
+        this.nextSnapshotId = nextSnapshotId;
+    }
+
+    @Override
+    public void run() throws Exception {
+        FileStoreTable dataTable = (FileStoreTable) table;
+        ConsumerManager consumerManager =
+                new ConsumerManager(dataTable.fileIO(), dataTable.location());
+        consumerManager.resetConsumer(consumerId, new 
Consumer(nextSnapshotId));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
new file mode 100644
index 000000000..788cc519b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link ResetConsumerAction}. */
+public class ResetConsumerActionFactory implements ActionFactory {
+
+    public static final String IDENTIFIER = "reset-consumer";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @Override
+    public Optional<Action> create(MultipleParameterTool params) {
+        checkRequiredArgument(params, "consumer-id");
+        checkRequiredArgument(params, "next-snapshot");
+
+        Tuple3<String, String, String> tablePath = getTablePath(params);
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
+        String consumerId = params.get("consumer-id");
+        long nextSnapshotId = Long.parseLong(params.get("next-snapshot"));
+
+        ResetConsumerAction action =
+                new ResetConsumerAction(
+                        tablePath.f0,
+                        tablePath.f1,
+                        tablePath.f2,
+                        catalogConfig,
+                        consumerId,
+                        nextSnapshotId);
+        return Optional.of(action);
+    }
+
+    @Override
+    public void printHelp() {
+        System.out.println(
+                "Action \"reset-consumer\" reset a consumer from the given 
next snapshot.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  reset-consumer --warehouse <warehouse-path> --database 
<database-name> "
+                        + "--table <table-name> --consumer-id <consumer-id> 
--next-snapshot <next-snapshot-id>");
+        System.out.println();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 7a62c0cea..f2cfb71ce 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -14,6 +14,8 @@
 # limitations under the License.
 
 org.apache.paimon.flink.kafka.KafkaLogStoreFactory
+
+### action factories
 org.apache.paimon.flink.action.CompactActionFactory
 org.apache.paimon.flink.action.DropPartitionActionFactory
 org.apache.paimon.flink.action.DeleteActionFactory
@@ -21,6 +23,7 @@ org.apache.paimon.flink.action.MergeIntoActionFactory
 org.apache.paimon.flink.action.RollbackToActionFactory
 org.apache.paimon.flink.action.CreateTagActionFactory
 org.apache.paimon.flink.action.DeleteTagActionFactory
+org.apache.paimon.flink.action.ResetConsumerActionFactory
 org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableActionFactory
 org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseActionFactory
 org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableActionFactory
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
new file mode 100644
index 000000000..e33b3f42d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingRead;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for consumer management actions. */
+public class ConsumerActionITCase extends ActionITCaseBase {
+
+    @Test
+    public void testResetConsumer() throws Exception {
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"pk1", "col1"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("pk1"),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        // use consumer streaming read table
+        testStreamingRead(
+                        "SELECT * FROM `" + tableName + "` /*+ 
OPTIONS('consumer-id'='myid') */",
+                        Arrays.asList(
+                                changelogRow("+I", 1L, "Hi"),
+                                changelogRow("+I", 2L, "Hello"),
+                                changelogRow("+I", 3L, "Paimon")))
+                .close();
+
+        ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), 
table.location());
+        Optional<Consumer> consumer1 = consumerManager.consumer("myid");
+        assertThat(consumer1).isPresent();
+        assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
+
+        // reset consumer
+        ResetConsumerAction resetConsumerAction =
+                new ResetConsumerAction(
+                        warehouse, database, tableName, 
Collections.emptyMap(), "myid", 1);
+        resetConsumerAction.run();
+
+        Optional<Consumer> consumer2 = consumerManager.consumer("myid");
+        assertThat(consumer2).isPresent();
+        assertThat(consumer2.get().nextSnapshot()).isEqualTo(1);
+    }
+}

Reply via email to