This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 41844892f [core] Consumer-id support branch (#3790)
41844892f is described below
commit 41844892f9a3ca699788c1619cecc78176c4246d
Author: herefree <[email protected]>
AuthorDate: Tue Jul 23 17:47:05 2024 +0800
[core] Consumer-id support branch (#3790)
---
docs/content/maintenance/manage-branches.md | 1 +
.../apache/paimon/consumer/ConsumerManager.java | 15 +++-
.../paimon/table/AbstractFileStoreTable.java | 2 +-
.../apache/paimon/table/ExpireChangelogImpl.java | 5 +-
.../apache/paimon/table/ExpireSnapshotsImpl.java | 5 +-
.../table/source/snapshot/SnapshotReaderImpl.java | 5 +-
.../org/apache/paimon/utils/SnapshotManager.java | 4 ++
.../paimon/consumer/ConsumerManagerTest.java | 52 ++++++++++++++
.../paimon/flink/action/ResetConsumerAction.java | 5 +-
.../flink/procedure/ResetConsumerProcedure.java | 10 ++-
.../paimon/flink/action/ConsumerActionITCase.java | 84 ++++++++++++++++++++++
.../spark/procedure/ResetConsumerProcedure.java | 5 +-
12 files changed, 183 insertions(+), 10 deletions(-)
diff --git a/docs/content/maintenance/manage-branches.md
b/docs/content/maintenance/manage-branches.md
index 0cb52b6f2..4343214f2 100644
--- a/docs/content/maintenance/manage-branches.md
+++ b/docs/content/maintenance/manage-branches.md
@@ -118,6 +118,7 @@ You can read or write with branch as below.
```sql
-- read from branch 'branch1'
SELECT * FROM `t$branch_branch1`;
+SELECT * FROM `t$branch_branch1` /*+ OPTIONS('consumer-id' = 'myid') */;
-- write to branch 'branch1'
INSERT INTO `t$branch_branch1` SELECT ...
diff --git
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index 6a928b81c..093031b06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -21,6 +21,7 @@ package org.apache.paimon.consumer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.StringUtils;
import java.io.IOException;
import java.io.Serializable;
@@ -33,6 +34,8 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Collectors;
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+import static org.apache.paimon.utils.BranchManager.branchPath;
import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles;
import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
@@ -46,9 +49,16 @@ public class ConsumerManager implements Serializable {
private final FileIO fileIO;
private final Path tablePath;
+ private final String branch;
+
public ConsumerManager(FileIO fileIO, Path tablePath) {
+ this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
+ }
+
+ public ConsumerManager(FileIO fileIO, Path tablePath, String branchName) {
this.fileIO = fileIO;
this.tablePath = tablePath;
+ this.branch = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH :
branchName;
}
public Optional<Consumer> consumer(String consumerId) {
@@ -119,10 +129,11 @@ public class ConsumerManager implements Serializable {
}
private Path consumerDirectory() {
- return new Path(tablePath + "/consumer");
+ return new Path(branchPath(tablePath, branch) + "/consumer");
}
private Path consumerPath(String consumerId) {
- return new Path(tablePath + "/consumer/" + CONSUMER_PREFIX +
consumerId);
+ return new Path(
+ branchPath(tablePath, branch) + "/consumer/" + CONSUMER_PREFIX
+ consumerId);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 0171ff677..818b6e87e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -374,7 +374,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
options.writeOnly() ? null : store().newTagCreationManager(),
catalogEnvironment.lockFactory().create(),
CoreOptions.fromMap(options()).consumerExpireTime(),
- new ConsumerManager(fileIO, path),
+ new ConsumerManager(fileIO, path, snapshotManager().branch()),
coreOptions().snapshotExpireExecutionMode(),
name(),
coreOptions().forceCreatingSnapshot());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index 6d5e76d5b..759088a06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -56,7 +56,10 @@ public class ExpireChangelogImpl implements ExpireSnapshots {
this.snapshotManager = snapshotManager;
this.tagManager = tagManager;
this.consumerManager =
- new ConsumerManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
+ new ConsumerManager(
+ snapshotManager.fileIO(),
+ snapshotManager.tablePath(),
+ snapshotManager.branch());
this.changelogDeletion = changelogDeletion;
this.expireConfig = ExpireConfig.builder().build();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 73fac37e6..170047297 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -56,7 +56,10 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
TagManager tagManager) {
this.snapshotManager = snapshotManager;
this.consumerManager =
- new ConsumerManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
+ new ConsumerManager(
+ snapshotManager.fileIO(),
+ snapshotManager.tablePath(),
+ snapshotManager.branch());
this.snapshotDeletion = snapshotDeletion;
this.tagManager = tagManager;
this.expireConfig = ExpireConfig.builder().build();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index bd9e48e4c..1b58fea91 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -98,7 +98,10 @@ public class SnapshotReaderImpl implements SnapshotReader {
this.deletionVectors = options.deletionVectorsEnabled();
this.snapshotManager = snapshotManager;
this.consumerManager =
- new ConsumerManager(snapshotManager.fileIO(),
snapshotManager.tablePath());
+ new ConsumerManager(
+ snapshotManager.fileIO(),
+ snapshotManager.tablePath(),
+ snapshotManager.branch());
this.splitGenerator = splitGenerator;
this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
this.defaultValueAssigner = defaultValueAssigner;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index ca88259de..cf0b44b5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -90,6 +90,10 @@ public class SnapshotManager implements Serializable {
return tablePath;
}
+ public String branch() {
+ return branch;
+ }
+
public Path changelogDirectory() {
return new Path(branchPath(tablePath, branch) + "/changelog");
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
index 9ee320754..9ed685a4d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
@@ -39,11 +39,18 @@ public class ConsumerManagerTest {
private ConsumerManager manager;
+ private ConsumerManager consumerManagerBranch;
+
@BeforeEach
public void before() {
this.manager =
new ConsumerManager(
LocalFileIO.create(), new
org.apache.paimon.fs.Path(tempDir.toUri()));
+ this.consumerManagerBranch =
+ new ConsumerManager(
+ LocalFileIO.create(),
+ new org.apache.paimon.fs.Path(tempDir.toUri()),
+ "branch1");
}
@Test
@@ -62,6 +69,21 @@ public class ConsumerManagerTest {
assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(8L);
assertThat(manager.minNextSnapshot()).isEqualTo(OptionalLong.of(5L));
+
+ Optional<Consumer> consumerBranch =
consumerManagerBranch.consumer("id1");
+ assertThat(consumerBranch).isEmpty();
+
+ assertThat(consumerManagerBranch.minNextSnapshot()).isEmpty();
+
+ consumerManagerBranch.resetConsumer("id1", new Consumer(5));
+ consumerBranch = consumerManagerBranch.consumer("id1");
+
assertThat(consumerBranch).map(Consumer::nextSnapshot).get().isEqualTo(5L);
+
+ consumerManagerBranch.resetConsumer("id2", new Consumer(8));
+ consumerBranch = consumerManagerBranch.consumer("id2");
+
assertThat(consumerBranch).map(Consumer::nextSnapshot).get().isEqualTo(8L);
+
+
assertThat(consumerManagerBranch.minNextSnapshot()).isEqualTo(OptionalLong.of(5L));
}
@Test
@@ -83,11 +105,41 @@ public class ConsumerManagerTest {
manager.resetConsumer("id2", new Consumer(3));
manager.expire(expireDateTime);
assertThat(manager.consumer("id2")).map(Consumer::nextSnapshot).get().isEqualTo(3L);
+
+ consumerManagerBranch.resetConsumer("id3", new Consumer(1));
+ Thread.sleep(1000);
+ LocalDateTime expireDateTimeBranch =
+ DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
+ Thread.sleep(1000);
+ consumerManagerBranch.resetConsumer("id4", new Consumer(2));
+
+ // check expire
+ consumerManagerBranch.expire(expireDateTimeBranch);
+ assertThat(consumerManagerBranch.consumer("id3")).isEmpty();
+ assertThat(consumerManagerBranch.consumer("id4"))
+ .map(Consumer::nextSnapshot)
+ .get()
+ .isEqualTo(2L);
+
+ // check last modification
+ expireDateTimeBranch =
DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
+ Thread.sleep(1000);
+ consumerManagerBranch.resetConsumer("id4", new Consumer(3));
+ consumerManagerBranch.expire(expireDateTimeBranch);
+ assertThat(consumerManagerBranch.consumer("id4"))
+ .map(Consumer::nextSnapshot)
+ .get()
+ .isEqualTo(3L);
}
@Test
public void testReadConsumer() throws Exception {
manager.resetConsumer("id1", new Consumer(5));
assertThat(manager.consumer("id1"));
+
+ consumerManagerBranch.resetConsumer("id2", new Consumer(5));
+ assertThat(consumerManagerBranch.consumer("id2"));
+
+ assertThat(manager.consumer("id2")).isEmpty();
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
index d4d13cc2e..615b448ec 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
@@ -50,7 +50,10 @@ public class ResetConsumerAction extends TableActionBase {
public void run() throws Exception {
FileStoreTable dataTable = (FileStoreTable) table;
ConsumerManager consumerManager =
- new ConsumerManager(dataTable.fileIO(), dataTable.location());
+ new ConsumerManager(
+ dataTable.fileIO(),
+ dataTable.location(),
+ dataTable.snapshotManager().branch());
if (Objects.isNull(nextSnapshotId)) {
consumerManager.deleteConsumer(consumerId);
} else {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
index 6ff4df5a1..0355d6dc1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -50,7 +50,10 @@ public class ResetConsumerProcedure extends ProcedureBase {
FileStoreTable fileStoreTable =
(FileStoreTable)
catalog.getTable(Identifier.fromString(tableId));
ConsumerManager consumerManager =
- new ConsumerManager(fileStoreTable.fileIO(),
fileStoreTable.location());
+ new ConsumerManager(
+ fileStoreTable.fileIO(),
+ fileStoreTable.location(),
+ fileStoreTable.snapshotManager().branch());
consumerManager.resetConsumer(consumerId, new
Consumer(nextSnapshotId));
return new String[] {"Success"};
@@ -61,7 +64,10 @@ public class ResetConsumerProcedure extends ProcedureBase {
FileStoreTable fileStoreTable =
(FileStoreTable)
catalog.getTable(Identifier.fromString(tableId));
ConsumerManager consumerManager =
- new ConsumerManager(fileStoreTable.fileIO(),
fileStoreTable.location());
+ new ConsumerManager(
+ fileStoreTable.fileIO(),
+ fileStoreTable.location(),
+ fileStoreTable.snapshotManager().branch());
consumerManager.deleteConsumer(consumerId);
return new String[] {"Success"};
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 4818c97e6..6a17a4512 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -119,4 +119,88 @@ public class ConsumerActionITCase extends ActionITCaseBase
{
Optional<Consumer> consumer3 = consumerManager.consumer("myid");
assertThat(consumer3).isNotPresent();
}
+
+ @Test
+ public void testResetBranchConsumer() throws Exception {
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"pk1", "col1"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("pk1"),
+ Collections.emptyList(),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ String branchName = "b1";
+ table.createBranch("b1", 3);
+ String branchTableName = tableName + "$branch_b1";
+
+ // use consumer streaming read table
+ testStreamingRead(
+ "SELECT * FROM `"
+ + branchTableName
+ + "` /*+
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
+ Arrays.asList(
+ changelogRow("+I", 1L, "Hi"),
+ changelogRow("+I", 2L, "Hello"),
+ changelogRow("+I", 3L, "Paimon")))
+ .close();
+
+ ConsumerManager consumerManager =
+ new ConsumerManager(table.fileIO(), table.location(),
branchName);
+ Optional<Consumer> consumer1 = consumerManager.consumer("myid");
+ assertThat(consumer1).isPresent();
+ assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
+
+ List<String> args =
+ Arrays.asList(
+ "reset_consumer",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ branchTableName,
+ "--consumer_id",
+ "myid",
+ "--next_snapshot",
+ "1");
+ // reset consumer
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ createAction(ResetConsumerAction.class, args).run();
+ } else {
+ callProcedure(
+ String.format(
+ "CALL sys.reset_consumer('%s.%s', 'myid', 1)",
+ database, branchTableName));
+ }
+ Optional<Consumer> consumer2 = consumerManager.consumer("myid");
+ assertThat(consumer2).isPresent();
+ assertThat(consumer2.get().nextSnapshot()).isEqualTo(1);
+
+ // delete consumer
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ createAction(ResetConsumerAction.class, args.subList(0, 9)).run();
+ } else {
+ callProcedure(
+ String.format(
+ "CALL sys.reset_consumer('%s.%s', 'myid')",
database, branchTableName));
+ }
+ Optional<Consumer> consumer3 = consumerManager.consumer("myid");
+ assertThat(consumer3).isNotPresent();
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java
index aafed90a3..a13227e95 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java
@@ -83,7 +83,10 @@ public class ResetConsumerProcedure extends BaseProcedure {
table -> {
FileStoreTable fileStoreTable = (FileStoreTable) table;
ConsumerManager consumerManager =
- new ConsumerManager(fileStoreTable.fileIO(),
fileStoreTable.location());
+ new ConsumerManager(
+ fileStoreTable.fileIO(),
+ fileStoreTable.location(),
+ fileStoreTable.snapshotManager().branch());
if (nextSnapshotId == null) {
consumerManager.deleteConsumer(consumerId);
} else {