This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f49afd862 [core] Reset consumer action supports delete a consumer with
a given consumer ID. (#2106)
f49afd862 is described below
commit f49afd862192c65c3d80fe6638f54f4ed5a1f2d4
Author: Kerwin <[email protected]>
AuthorDate: Tue Oct 10 15:18:49 2023 +0800
[core] Reset consumer action supports delete a consumer with a given
consumer ID. (#2106)
---
docs/content/how-to/querying-tables.md | 6 ++++--
.../java/org/apache/paimon/utils/StringUtils.java | 6 +++---
.../apache/paimon/consumer/ConsumerManager.java | 4 ++++
.../paimon/flink/action/ResetConsumerAction.java | 16 +++++++++++----
.../flink/action/ResetConsumerActionFactory.java | 23 ++++++++++++----------
.../flink/procedure/ResetConsumerProcedure.java | 17 +++++++++++++++-
.../flink/procedure/RollbackToProcedure.java | 2 +-
.../paimon/flink/action/ConsumerActionITCase.java | 15 ++++++++++++--
8 files changed, 66 insertions(+), 23 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 5e2abd0ec..286103d12 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -303,7 +303,7 @@ NOTE: The consumer will prevent expiration of the snapshot.
You can specify 'con
lifetime of consumers.
{{< /hint >}}
-You can reset a consumer with a given consumer ID and next snapshot ID.
+You can reset a consumer with a given consumer ID and next snapshot ID and
delete a consumer with a given consumer ID.
{{< hint info >}}
First, you need to stop the streaming task using this consumer ID, and then
execute the reset consumer action job.
@@ -323,10 +323,12 @@ Run the following command:
--database <database-name> \
--table <table-name> \
--consumer-id <consumer-id> \
- --next-snapshot <next-snapshot-id> \
+ [--next-snapshot <next-snapshot-id>] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]]
```
+please don't specify --next-snapshot parameter if you want to delete the
consumer.
+
{{< /tab >}}
{{< /tabs >}}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
index 9bd5e3d9e..adc4ce553 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/StringUtils.java
@@ -93,7 +93,7 @@ public class StringUtils {
* @return True, if the string is null or blank, false otherwise.
*/
public static boolean isNullOrWhitespaceOnly(String str) {
- if (str == null || str.length() == 0) {
+ if (str == null || str.isEmpty()) {
return true;
}
@@ -302,8 +302,8 @@ public class StringUtils {
}
final int replLength = searchString.length();
int increase = replacement.length() - replLength;
- increase = increase < 0 ? 0 : increase;
- increase *= max < 0 ? 16 : max > 64 ? 64 : max;
+ increase = Math.max(increase, 0);
+ increase *= max < 0 ? 16 : Math.min(max, 64);
final StringBuilder buf = new StringBuilder(text.length() + increase);
while (end != INDEX_NOT_FOUND) {
buf.append(text, start, end).append(replacement);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index bf24dc753..b788c4a28 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -68,6 +68,10 @@ public class ConsumerManager implements Serializable {
}
}
+ public void deleteConsumer(String consumerId) {
+ fileIO.deleteQuietly(consumerPath(consumerId));
+ }
+
public OptionalLong minNextSnapshot() {
try {
return listOriginalVersionedFiles(fileIO, consumerDirectory(),
CONSUMER_PREFIX)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
index 8a6507fbb..d4d13cc2e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
@@ -23,23 +23,27 @@ import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.table.FileStoreTable;
import java.util.Map;
+import java.util.Objects;
/** Reset consumer action for Flink. */
public class ResetConsumerAction extends TableActionBase {
private final String consumerId;
- private final long nextSnapshotId;
+ private Long nextSnapshotId;
protected ResetConsumerAction(
String warehouse,
String databaseName,
String tableName,
Map<String, String> catalogConfig,
- String consumerId,
- long nextSnapshotId) {
+ String consumerId) {
super(warehouse, databaseName, tableName, catalogConfig);
this.consumerId = consumerId;
+ }
+
+ public ResetConsumerAction withNextSnapshotIds(Long nextSnapshotId) {
this.nextSnapshotId = nextSnapshotId;
+ return this;
}
@Override
@@ -47,6 +51,10 @@ public class ResetConsumerAction extends TableActionBase {
FileStoreTable dataTable = (FileStoreTable) table;
ConsumerManager consumerManager =
new ConsumerManager(dataTable.fileIO(), dataTable.location());
- consumerManager.resetConsumer(consumerId, new
Consumer(nextSnapshotId));
+ if (Objects.isNull(nextSnapshotId)) {
+ consumerManager.deleteConsumer(consumerId);
+ } else {
+ consumerManager.resetConsumer(consumerId, new
Consumer(nextSnapshotId));
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
index 788cc519b..d8e04bf0e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerActionFactory.java
@@ -37,34 +37,37 @@ public class ResetConsumerActionFactory implements
ActionFactory {
@Override
public Optional<Action> create(MultipleParameterTool params) {
checkRequiredArgument(params, "consumer-id");
- checkRequiredArgument(params, "next-snapshot");
Tuple3<String, String, String> tablePath = getTablePath(params);
Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
String consumerId = params.get("consumer-id");
- long nextSnapshotId = Long.parseLong(params.get("next-snapshot"));
ResetConsumerAction action =
new ResetConsumerAction(
- tablePath.f0,
- tablePath.f1,
- tablePath.f2,
- catalogConfig,
- consumerId,
- nextSnapshotId);
+ tablePath.f0, tablePath.f1, tablePath.f2,
catalogConfig, consumerId);
+
+ if (params.has("next-snapshot")) {
+
action.withNextSnapshotIds(Long.parseLong(params.get("next-snapshot")));
+ }
+
return Optional.of(action);
}
@Override
public void printHelp() {
System.out.println(
- "Action \"reset-consumer\" reset a consumer from the given
next snapshot.");
+ "Action \"reset-consumer\" reset a consumer with a given
consumer ID and next snapshot ID and delete a consumer with a given consumer
ID.");
System.out.println();
System.out.println("Syntax:");
System.out.println(
" reset-consumer --warehouse <warehouse-path> --database
<database-name> "
- + "--table <table-name> --consumer-id <consumer-id>
--next-snapshot <next-snapshot-id>");
+ + "--table <table-name> --consumer-id <consumer-id>
[--next-snapshot <next-snapshot-id>]");
+
+ System.out.println();
+ System.out.println("Note:");
+ System.out.println(
+ " please don't specify --next-snapshot parameter if you want
to delete the consumer.");
System.out.println();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
index 70a409b0b..8b6b4ee5b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -27,10 +27,14 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.flink.table.procedure.ProcedureContext;
/**
- * Drop partition procedure. Usage:
+ * Reset consumer procedure. Usage:
*
* <pre><code>
+ * -- reset the new next snapshot id in the consumer
* CALL reset_consumer('tableId', 'consumerId', nextSnapshotId)
+ *
+ * -- delete consumer
+ * CALL reset_consumer('tableId', 'consumerId')
* </code></pre>
*/
public class ResetConsumerProcedure extends ProcedureBase {
@@ -55,4 +59,15 @@ public class ResetConsumerProcedure extends ProcedureBase {
return new String[] {"Success"};
}
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
String consumerId)
+ throws Catalog.TableNotExistException {
+ FileStoreTable fileStoreTable =
+ (FileStoreTable)
catalog.getTable(Identifier.fromString(tableId));
+ ConsumerManager consumerManager =
+ new ConsumerManager(fileStoreTable.fileIO(),
fileStoreTable.location());
+ consumerManager.deleteConsumer(consumerId);
+
+ return new String[] {"Success"};
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
index b8d6dd38c..a59dd238f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -25,7 +25,7 @@ import org.apache.paimon.table.Table;
import org.apache.flink.table.procedure.ProcedureContext;
/**
- * Drop partition procedure. Usage:
+ * Rollback procedure. Usage:
*
* <pre><code>
* -- rollback to a snapshot
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 444d2f71e..c3aee72c5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -82,8 +82,8 @@ public class ConsumerActionITCase extends ActionITCaseBase {
// reset consumer
if (ThreadLocalRandom.current().nextBoolean()) {
- new ResetConsumerAction(
- warehouse, database, tableName,
Collections.emptyMap(), "myid", 1)
+ new ResetConsumerAction(warehouse, database, tableName,
Collections.emptyMap(), "myid")
+ .withNextSnapshotIds(1L)
.run();
} else {
callProcedure(
@@ -92,5 +92,16 @@ public class ConsumerActionITCase extends ActionITCaseBase {
Optional<Consumer> consumer2 = consumerManager.consumer("myid");
assertThat(consumer2).isPresent();
assertThat(consumer2.get().nextSnapshot()).isEqualTo(1);
+
+ // delete consumer
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ new ResetConsumerAction(warehouse, database, tableName,
Collections.emptyMap(), "myid")
+ .run();
+ } else {
+ callProcedure(
+ String.format("CALL reset_consumer('%s.%s', 'myid')",
database, tableName));
+ }
+ Optional<Consumer> consumer3 = consumerManager.consumer("myid");
+ assertThat(consumer3).isNotPresent();
}
}