This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 41844892f [core] Consumer-id support branch (#3790)
41844892f is described below

commit 41844892f9a3ca699788c1619cecc78176c4246d
Author: herefree <[email protected]>
AuthorDate: Tue Jul 23 17:47:05 2024 +0800

    [core] Consumer-id support branch (#3790)
---
 docs/content/maintenance/manage-branches.md        |  1 +
 .../apache/paimon/consumer/ConsumerManager.java    | 15 +++-
 .../paimon/table/AbstractFileStoreTable.java       |  2 +-
 .../apache/paimon/table/ExpireChangelogImpl.java   |  5 +-
 .../apache/paimon/table/ExpireSnapshotsImpl.java   |  5 +-
 .../table/source/snapshot/SnapshotReaderImpl.java  |  5 +-
 .../org/apache/paimon/utils/SnapshotManager.java   |  4 ++
 .../paimon/consumer/ConsumerManagerTest.java       | 52 ++++++++++++++
 .../paimon/flink/action/ResetConsumerAction.java   |  5 +-
 .../flink/procedure/ResetConsumerProcedure.java    | 10 ++-
 .../paimon/flink/action/ConsumerActionITCase.java  | 84 ++++++++++++++++++++++
 .../spark/procedure/ResetConsumerProcedure.java    |  5 +-
 12 files changed, 183 insertions(+), 10 deletions(-)

diff --git a/docs/content/maintenance/manage-branches.md 
b/docs/content/maintenance/manage-branches.md
index 0cb52b6f2..4343214f2 100644
--- a/docs/content/maintenance/manage-branches.md
+++ b/docs/content/maintenance/manage-branches.md
@@ -118,6 +118,7 @@ You can read or write with branch as below.
 ```sql
 -- read from branch 'branch1'
 SELECT * FROM `t$branch_branch1`;
+SELECT * FROM `t$branch_branch1` /*+ OPTIONS('consumer-id' = 'myid') */;
 
 -- write to branch 'branch1'
 INSERT INTO `t$branch_branch1` SELECT ...
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java 
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index 6a928b81c..093031b06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -21,6 +21,7 @@ package org.apache.paimon.consumer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.StringUtils;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -33,6 +34,8 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+import static org.apache.paimon.utils.BranchManager.branchPath;
 import static org.apache.paimon.utils.FileUtils.listOriginalVersionedFiles;
 import static org.apache.paimon.utils.FileUtils.listVersionedFileStatus;
 
@@ -46,9 +49,16 @@ public class ConsumerManager implements Serializable {
     private final FileIO fileIO;
     private final Path tablePath;
 
+    private final String branch;
+
     public ConsumerManager(FileIO fileIO, Path tablePath) {
+        this(fileIO, tablePath, DEFAULT_MAIN_BRANCH);
+    }
+
+    public ConsumerManager(FileIO fileIO, Path tablePath, String branchName) {
         this.fileIO = fileIO;
         this.tablePath = tablePath;
+        this.branch = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : 
branchName;
     }
 
     public Optional<Consumer> consumer(String consumerId) {
@@ -119,10 +129,11 @@ public class ConsumerManager implements Serializable {
     }
 
     private Path consumerDirectory() {
-        return new Path(tablePath + "/consumer");
+        return new Path(branchPath(tablePath, branch) + "/consumer");
     }
 
     private Path consumerPath(String consumerId) {
-        return new Path(tablePath + "/consumer/" + CONSUMER_PREFIX + 
consumerId);
+        return new Path(
+                branchPath(tablePath, branch) + "/consumer/" + CONSUMER_PREFIX 
+ consumerId);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 0171ff677..818b6e87e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -374,7 +374,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
                 options.writeOnly() ? null : store().newTagCreationManager(),
                 catalogEnvironment.lockFactory().create(),
                 CoreOptions.fromMap(options()).consumerExpireTime(),
-                new ConsumerManager(fileIO, path),
+                new ConsumerManager(fileIO, path, snapshotManager().branch()),
                 coreOptions().snapshotExpireExecutionMode(),
                 name(),
                 coreOptions().forceCreatingSnapshot());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
index 6d5e76d5b..759088a06 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java
@@ -56,7 +56,10 @@ public class ExpireChangelogImpl implements ExpireSnapshots {
         this.snapshotManager = snapshotManager;
         this.tagManager = tagManager;
         this.consumerManager =
-                new ConsumerManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+                new ConsumerManager(
+                        snapshotManager.fileIO(),
+                        snapshotManager.tablePath(),
+                        snapshotManager.branch());
         this.changelogDeletion = changelogDeletion;
         this.expireConfig = ExpireConfig.builder().build();
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
index 73fac37e6..170047297 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java
@@ -56,7 +56,10 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots {
             TagManager tagManager) {
         this.snapshotManager = snapshotManager;
         this.consumerManager =
-                new ConsumerManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+                new ConsumerManager(
+                        snapshotManager.fileIO(),
+                        snapshotManager.tablePath(),
+                        snapshotManager.branch());
         this.snapshotDeletion = snapshotDeletion;
         this.tagManager = tagManager;
         this.expireConfig = ExpireConfig.builder().build();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index bd9e48e4c..1b58fea91 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -98,7 +98,10 @@ public class SnapshotReaderImpl implements SnapshotReader {
         this.deletionVectors = options.deletionVectorsEnabled();
         this.snapshotManager = snapshotManager;
         this.consumerManager =
-                new ConsumerManager(snapshotManager.fileIO(), 
snapshotManager.tablePath());
+                new ConsumerManager(
+                        snapshotManager.fileIO(),
+                        snapshotManager.tablePath(),
+                        snapshotManager.branch());
         this.splitGenerator = splitGenerator;
         this.nonPartitionFilterConsumer = nonPartitionFilterConsumer;
         this.defaultValueAssigner = defaultValueAssigner;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
index ca88259de..cf0b44b5b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java
@@ -90,6 +90,10 @@ public class SnapshotManager implements Serializable {
         return tablePath;
     }
 
+    public String branch() {
+        return branch;
+    }
+
     public Path changelogDirectory() {
         return new Path(branchPath(tablePath, branch) + "/changelog");
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java 
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
index 9ee320754..9ed685a4d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
@@ -39,11 +39,18 @@ public class ConsumerManagerTest {
 
     private ConsumerManager manager;
 
+    private ConsumerManager consumerManagerBranch;
+
     @BeforeEach
     public void before() {
         this.manager =
                 new ConsumerManager(
                         LocalFileIO.create(), new 
org.apache.paimon.fs.Path(tempDir.toUri()));
+        this.consumerManagerBranch =
+                new ConsumerManager(
+                        LocalFileIO.create(),
+                        new org.apache.paimon.fs.Path(tempDir.toUri()),
+                        "branch1");
     }
 
     @Test
@@ -62,6 +69,21 @@ public class ConsumerManagerTest {
         assertThat(consumer).map(Consumer::nextSnapshot).get().isEqualTo(8L);
 
         assertThat(manager.minNextSnapshot()).isEqualTo(OptionalLong.of(5L));
+
+        Optional<Consumer> consumerBranch = 
consumerManagerBranch.consumer("id1");
+        assertThat(consumerBranch).isEmpty();
+
+        assertThat(consumerManagerBranch.minNextSnapshot()).isEmpty();
+
+        consumerManagerBranch.resetConsumer("id1", new Consumer(5));
+        consumerBranch = consumerManagerBranch.consumer("id1");
+        
assertThat(consumerBranch).map(Consumer::nextSnapshot).get().isEqualTo(5L);
+
+        consumerManagerBranch.resetConsumer("id2", new Consumer(8));
+        consumerBranch = consumerManagerBranch.consumer("id2");
+        
assertThat(consumerBranch).map(Consumer::nextSnapshot).get().isEqualTo(8L);
+
+        
assertThat(consumerManagerBranch.minNextSnapshot()).isEqualTo(OptionalLong.of(5L));
     }
 
     @Test
@@ -83,11 +105,41 @@ public class ConsumerManagerTest {
         manager.resetConsumer("id2", new Consumer(3));
         manager.expire(expireDateTime);
         
assertThat(manager.consumer("id2")).map(Consumer::nextSnapshot).get().isEqualTo(3L);
+
+        consumerManagerBranch.resetConsumer("id3", new Consumer(1));
+        Thread.sleep(1000);
+        LocalDateTime expireDateTimeBranch =
+                DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
+        Thread.sleep(1000);
+        consumerManagerBranch.resetConsumer("id4", new Consumer(2));
+
+        // check expire
+        consumerManagerBranch.expire(expireDateTimeBranch);
+        assertThat(consumerManagerBranch.consumer("id3")).isEmpty();
+        assertThat(consumerManagerBranch.consumer("id4"))
+                .map(Consumer::nextSnapshot)
+                .get()
+                .isEqualTo(2L);
+
+        // check last modification
+        expireDateTimeBranch = 
DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
+        Thread.sleep(1000);
+        consumerManagerBranch.resetConsumer("id4", new Consumer(3));
+        consumerManagerBranch.expire(expireDateTimeBranch);
+        assertThat(consumerManagerBranch.consumer("id4"))
+                .map(Consumer::nextSnapshot)
+                .get()
+                .isEqualTo(3L);
     }
 
     @Test
     public void testReadConsumer() throws Exception {
         manager.resetConsumer("id1", new Consumer(5));
         assertThat(manager.consumer("id1"));
+
+        consumerManagerBranch.resetConsumer("id2", new Consumer(5));
+        assertThat(consumerManagerBranch.consumer("id2"));
+
+        assertThat(manager.consumer("id2")).isEmpty();
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
index d4d13cc2e..615b448ec 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ResetConsumerAction.java
@@ -50,7 +50,10 @@ public class ResetConsumerAction extends TableActionBase {
     public void run() throws Exception {
         FileStoreTable dataTable = (FileStoreTable) table;
         ConsumerManager consumerManager =
-                new ConsumerManager(dataTable.fileIO(), dataTable.location());
+                new ConsumerManager(
+                        dataTable.fileIO(),
+                        dataTable.location(),
+                        dataTable.snapshotManager().branch());
         if (Objects.isNull(nextSnapshotId)) {
             consumerManager.deleteConsumer(consumerId);
         } else {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
index 6ff4df5a1..0355d6dc1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -50,7 +50,10 @@ public class ResetConsumerProcedure extends ProcedureBase {
         FileStoreTable fileStoreTable =
                 (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
         ConsumerManager consumerManager =
-                new ConsumerManager(fileStoreTable.fileIO(), 
fileStoreTable.location());
+                new ConsumerManager(
+                        fileStoreTable.fileIO(),
+                        fileStoreTable.location(),
+                        fileStoreTable.snapshotManager().branch());
         consumerManager.resetConsumer(consumerId, new 
Consumer(nextSnapshotId));
 
         return new String[] {"Success"};
@@ -61,7 +64,10 @@ public class ResetConsumerProcedure extends ProcedureBase {
         FileStoreTable fileStoreTable =
                 (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
         ConsumerManager consumerManager =
-                new ConsumerManager(fileStoreTable.fileIO(), 
fileStoreTable.location());
+                new ConsumerManager(
+                        fileStoreTable.fileIO(),
+                        fileStoreTable.location(),
+                        fileStoreTable.snapshotManager().branch());
         consumerManager.deleteConsumer(consumerId);
 
         return new String[] {"Success"};
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 4818c97e6..6a17a4512 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -119,4 +119,88 @@ public class ConsumerActionITCase extends ActionITCaseBase 
{
         Optional<Consumer> consumer3 = consumerManager.consumer("myid");
         assertThat(consumer3).isNotPresent();
     }
+
+    @Test
+    public void testResetBranchConsumer() throws Exception {
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"pk1", "col1"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("pk1"),
+                        Collections.emptyList(),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        String branchName = "b1";
+        table.createBranch("b1", 3);
+        String branchTableName = tableName + "$branch_b1";
+
+        // use consumer streaming read table
+        testStreamingRead(
+                        "SELECT * FROM `"
+                                + branchTableName
+                                + "` /*+ 
OPTIONS('consumer-id'='myid','consumer.expiration-time'='3h') */",
+                        Arrays.asList(
+                                changelogRow("+I", 1L, "Hi"),
+                                changelogRow("+I", 2L, "Hello"),
+                                changelogRow("+I", 3L, "Paimon")))
+                .close();
+
+        ConsumerManager consumerManager =
+                new ConsumerManager(table.fileIO(), table.location(), 
branchName);
+        Optional<Consumer> consumer1 = consumerManager.consumer("myid");
+        assertThat(consumer1).isPresent();
+        assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
+
+        List<String> args =
+                Arrays.asList(
+                        "reset_consumer",
+                        "--warehouse",
+                        warehouse,
+                        "--database",
+                        database,
+                        "--table",
+                        branchTableName,
+                        "--consumer_id",
+                        "myid",
+                        "--next_snapshot",
+                        "1");
+        // reset consumer
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            createAction(ResetConsumerAction.class, args).run();
+        } else {
+            callProcedure(
+                    String.format(
+                            "CALL sys.reset_consumer('%s.%s', 'myid', 1)",
+                            database, branchTableName));
+        }
+        Optional<Consumer> consumer2 = consumerManager.consumer("myid");
+        assertThat(consumer2).isPresent();
+        assertThat(consumer2.get().nextSnapshot()).isEqualTo(1);
+
+        // delete consumer
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            createAction(ResetConsumerAction.class, args.subList(0, 9)).run();
+        } else {
+            callProcedure(
+                    String.format(
+                            "CALL sys.reset_consumer('%s.%s', 'myid')", 
database, branchTableName));
+        }
+        Optional<Consumer> consumer3 = consumerManager.consumer("myid");
+        assertThat(consumer3).isNotPresent();
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java
index aafed90a3..a13227e95 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ResetConsumerProcedure.java
@@ -83,7 +83,10 @@ public class ResetConsumerProcedure extends BaseProcedure {
                 table -> {
                     FileStoreTable fileStoreTable = (FileStoreTable) table;
                     ConsumerManager consumerManager =
-                            new ConsumerManager(fileStoreTable.fileIO(), 
fileStoreTable.location());
+                            new ConsumerManager(
+                                    fileStoreTable.fileIO(),
+                                    fileStoreTable.location(),
+                                    fileStoreTable.snapshotManager().branch());
                     if (nextSnapshotId == null) {
                         consumerManager.deleteConsumer(consumerId);
                     } else {

Reply via email to