This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9e55f27ff [action] Support Reset next snapshot id of consumer id.
(#1586)
9e55f27ff is described below
commit 9e55f27ff08c7a7f5dd17ad254467fd7ceeaaec2
Author: Kerwin <[email protected]>
AuthorDate: Mon Jul 24 18:10:04 2023 +0800
[action] Support Reset next snapshot id of consumer id. (#1586)
---
docs/content/how-to/querying-tables.md | 28 +++++++
.../apache/paimon/consumer/ConsumerManager.java | 2 +-
.../table/source/InnerStreamTableScanImpl.java | 2 +-
.../paimon/consumer/ConsumerManagerTest.java | 10 +--
.../paimon/flink/action/ResetConsumerAction.java | 52 ++++++++++++
.../flink/action/ResetConsumerActionFactory.java | 70 ++++++++++++++++
.../services/org.apache.paimon.factories.Factory | 3 +
.../paimon/flink/action/ConsumerActionITCase.java | 92 ++++++++++++++++++++++
8 files changed, 252 insertions(+), 7 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index a211cab38..ccd497c8a 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -269,6 +269,34 @@ NOTE: The consumer will prevent expiration of the
snapshot. You can specify 'con
lifetime of consumers.
{{< /hint >}}
+You can reset a consumer with a given consumer ID and next snapshot ID.
+
+{{< hint info >}}
+First, you need to stop the streaming task using this consumer ID, and then
execute the reset consumer action job.
+{{< /hint >}}
+
+{{< tabs "reset-consumer" >}}
+
+{{< tab "Flink" >}}
+
+Run the following command:
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ reset-consumer \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table <table-name> \
+ --consumer-id <consumer-id> \
+ --next-snapshot <next-snapshot-id> \
+ [--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]]
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
## Query Optimization
{{< label Batch >}}{{< label Streaming >}}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index 247f4372e..bf24dc753 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -58,7 +58,7 @@ public class ConsumerManager implements Serializable {
return Consumer.fromPath(fileIO, consumerPath(consumerId));
}
- public void recordConsumer(String consumerId, Consumer consumer) {
+ public void resetConsumer(String consumerId, Consumer consumer) {
try (PositionOutputStream out =
fileIO.newOutputStream(consumerPath(consumerId), true)) {
OutputStreamWriter writer = new OutputStreamWriter(out,
StandardCharsets.UTF_8);
writer.write(consumer.toJson());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 3040ee5b7..c30871378 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -238,7 +238,7 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
String consumerId = options.consumerId();
if (consumerId != null) {
- snapshotReader.consumerManager().recordConsumer(consumerId, new
Consumer(nextSnapshot));
+ snapshotReader.consumerManager().resetConsumer(consumerId, new
Consumer(nextSnapshot));
}
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
index cab65809d..a556a14f0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
@@ -53,11 +53,11 @@ public class ConsumerManagerTest {
assertThat(manager.minNextSnapshot()).isEmpty();
- manager.recordConsumer("id1", new Consumer(5));
+ manager.resetConsumer("id1", new Consumer(5));
consumer = manager.consumer("id1");
assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(5L);
- manager.recordConsumer("id2", new Consumer(8));
+ manager.resetConsumer("id2", new Consumer(8));
consumer = manager.consumer("id2");
assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(8L);
@@ -66,11 +66,11 @@ public class ConsumerManagerTest {
@Test
public void testExpire() throws Exception {
- manager.recordConsumer("id1", new Consumer(1));
+ manager.resetConsumer("id1", new Consumer(1));
Thread.sleep(1000);
LocalDateTime expireDateTime =
DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
Thread.sleep(1000);
- manager.recordConsumer("id2", new Consumer(2));
+ manager.resetConsumer("id2", new Consumer(2));
// check expire
manager.expire(expireDateTime);
@@ -80,7 +80,7 @@ public class ConsumerManagerTest {
// check last modification
expireDateTime =
DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
Thread.sleep(1000);
- manager.recordConsumer("id2", new Consumer(3));
+ manager.resetConsumer("id2", new Consumer(3));
manager.expire(expireDateTime);
assertThat(manager.consumer("id2")).map(Consumer::nextSnapshot).get().isEqualTo(3L);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
new file mode 100644
index 000000000..8a6507fbb
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.table.FileStoreTable;
+
+import java.util.Map;
+
+/** Reset consumer action for Flink. */
+public class ResetConsumerAction extends TableActionBase {
+
+ private final String consumerId;
+ private final long nextSnapshotId;
+
+ protected ResetConsumerAction(
+ String warehouse,
+ String databaseName,
+ String tableName,
+ Map<String, String> catalogConfig,
+ String consumerId,
+ long nextSnapshotId) {
+ super(warehouse, databaseName, tableName, catalogConfig);
+ this.consumerId = consumerId;
+ this.nextSnapshotId = nextSnapshotId;
+ }
+
+ @Override
+ public void run() throws Exception {
+ FileStoreTable dataTable = (FileStoreTable) table;
+ ConsumerManager consumerManager =
+ new ConsumerManager(dataTable.fileIO(), dataTable.location());
+ consumerManager.resetConsumer(consumerId, new
Consumer(nextSnapshotId));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
new file mode 100644
index 000000000..788cc519b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link ResetConsumerAction}. */
+public class ResetConsumerActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "reset-consumer";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterTool params) {
+ checkRequiredArgument(params, "consumer-id");
+ checkRequiredArgument(params, "next-snapshot");
+
+ Tuple3<String, String, String> tablePath = getTablePath(params);
+ Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
+ String consumerId = params.get("consumer-id");
+ long nextSnapshotId = Long.parseLong(params.get("next-snapshot"));
+
+ ResetConsumerAction action =
+ new ResetConsumerAction(
+ tablePath.f0,
+ tablePath.f1,
+ tablePath.f2,
+ catalogConfig,
+ consumerId,
+ nextSnapshotId);
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"reset-consumer\" reset a consumer from the given
next snapshot.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " reset-consumer --warehouse <warehouse-path> --database
<database-name> "
+ + "--table <table-name> --consumer-id <consumer-id>
--next-snapshot <next-snapshot-id>");
+ System.out.println();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 7a62c0cea..f2cfb71ce 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -14,6 +14,8 @@
# limitations under the License.
org.apache.paimon.flink.kafka.KafkaLogStoreFactory
+
+### action factories
org.apache.paimon.flink.action.CompactActionFactory
org.apache.paimon.flink.action.DropPartitionActionFactory
org.apache.paimon.flink.action.DeleteActionFactory
@@ -21,6 +23,7 @@ org.apache.paimon.flink.action.MergeIntoActionFactory
org.apache.paimon.flink.action.RollbackToActionFactory
org.apache.paimon.flink.action.CreateTagActionFactory
org.apache.paimon.flink.action.DeleteTagActionFactory
+org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncTableActionFactory
org.apache.paimon.flink.action.cdc.mysql.MySqlSyncDatabaseActionFactory
org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableActionFactory
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
new file mode 100644
index 000000000..e33b3f42d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Optional;
+
+import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testStreamingRead;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for consumer management actions. */
+public class ConsumerActionITCase extends ActionITCaseBase {
+
+ @Test
+ public void testResetConsumer() throws Exception {
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"pk1", "col1"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("pk1"),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ // use consumer streaming read table
+ testStreamingRead(
+ "SELECT * FROM `" + tableName + "` /*+
OPTIONS('consumer-id'='myid') */",
+ Arrays.asList(
+ changelogRow("+I", 1L, "Hi"),
+ changelogRow("+I", 2L, "Hello"),
+ changelogRow("+I", 3L, "Paimon")))
+ .close();
+
+ ConsumerManager consumerManager = new ConsumerManager(table.fileIO(),
table.location());
+ Optional<Consumer> consumer1 = consumerManager.consumer("myid");
+ assertThat(consumer1).isPresent();
+ assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
+
+ // reset consumer
+ ResetConsumerAction resetConsumerAction =
+ new ResetConsumerAction(
+ warehouse, database, tableName,
Collections.emptyMap(), "myid", 1);
+ resetConsumerAction.run();
+
+ Optional<Consumer> consumer2 = consumerManager.consumer("myid");
+ assertThat(consumer2).isPresent();
+ assertThat(consumer2.get().nextSnapshot()).isEqualTo(1);
+ }
+}