This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 877eb3099 [flink] Introduce create-tag and delete-tag actions (#1422)
877eb3099 is described below

commit 877eb3099a43f8c45b824059f79832dc1e3d280c
Author: yuzelin <[email protected]>
AuthorDate: Mon Jun 26 13:48:41 2023 +0800

    [flink] Introduce create-tag and delete-tag actions (#1422)
---
 .../java/org/apache/paimon/utils/TagManager.java   | 10 ++-
 .../apache/paimon/tests/FlinkActionsE2eTest.java   | 82 +++++++++++++++++++
 .../org/apache/paimon/flink/action/Action.java     |  8 ++
 .../paimon/flink/action/CreateTagAction.java       | 92 ++++++++++++++++++++++
 .../paimon/flink/action/DeleteTagAction.java       | 87 ++++++++++++++++++++
 .../apache/paimon/flink/CatalogTableITCase.java    | 14 ++++
 .../paimon/flink/action/TagActionITCase.java       | 84 ++++++++++++++++++++
 7 files changed, 373 insertions(+), 4 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index f16871d43..4a4f13dba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.SortedMap;
@@ -112,9 +113,8 @@ public class TagManager {
 
         for (FileStatus status : statuses) {
             Path path = status.getPath();
-            if (path.getName().startsWith(TAG_PREFIX)) {
-                tags.put(Snapshot.fromPath(fileIO, path), path.getName());
-            }
+            tags.put(
+                    Snapshot.fromPath(fileIO, path), 
path.getName().substring(TAG_PREFIX.length()));
         }
         return tags;
     }
@@ -135,7 +135,9 @@ public class TagManager {
                                 tagDirectory));
             }
 
-            return statuses;
+            return Arrays.stream(statuses)
+                    .filter(status -> 
status.getPath().getName().startsWith(TAG_PREFIX))
+                    .toArray(FileStatus[]::new);
         } catch (IOException e) {
             throw new RuntimeException(
                     String.format("Failed to list status in the '%s' 
directory.", tagDirectory), e);
diff --git 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
index cd3e23e5f..deb8da838 100644
--- 
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
+++ 
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
@@ -318,6 +318,88 @@ public class FlinkActionsE2eTest extends E2eTestBase {
         checkResult("1, Hi", "2, World");
     }
 
+    @Test
+    public void testCreateAndDeleteTag() throws Exception {
+        String tableTDdl =
+                "CREATE TABLE IF NOT EXISTS T (\n"
+                        + "    k INT,\n"
+                        + "    v STRING,\n"
+                        + "    PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ");\n";
+
+        // 3 snapshots
+        String inserts =
+                "INSERT INTO T VALUES (1, 'Hi');\n"
+                        + "INSERT INTO T VALUES (2, 'Hello');\n"
+                        + "INSERT INTO T VALUES (3, 'Paimon');\n";
+
+        runSql("SET 'table.dml-sync' = 'true';\n" + inserts, catalogDdl, 
useCatalogCmd, tableTDdl);
+
+        // create tag at snapshot 2 and check
+        Container.ExecResult execResult =
+                jobManager.execInContainer(
+                        "bin/flink",
+                        "run",
+                        "lib/paimon-flink-action.jar",
+                        "create-tag",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--table",
+                        "T",
+                        "--tag-name",
+                        "tag2",
+                        "--snapshot",
+                        "2");
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+
+        runSql(
+                "INSERT INTO _tags1 SELECT tag_name, snapshot_id FROM 
T\\$tags;",
+                catalogDdl,
+                useCatalogCmd,
+                createResultSink("_tags1", "tag_name STRING, snapshot_id 
BIGINT"));
+        checkResult("tag2, 2");
+        clearCurrentResults();
+
+        // read tag2
+        runSql(
+                "SET 'execution.runtime-mode' = 'batch';\n"
+                        + "INSERT INTO result1 SELECT * FROM T /*+ 
OPTIONS('scan.tag-name'='tag2') */;",
+                catalogDdl,
+                useCatalogCmd,
+                createResultSink("result1", "k INT, v STRING"));
+        checkResult("1, Hi", "2, Hello");
+        clearCurrentResults();
+
+        // delete tag2 and check
+        execResult =
+                jobManager.execInContainer(
+                        "bin/flink",
+                        "run",
+                        "lib/paimon-flink-action.jar",
+                        "delete-tag",
+                        "--warehouse",
+                        warehousePath,
+                        "--database",
+                        "default",
+                        "--table",
+                        "T",
+                        "--tag-name",
+                        "tag2");
+        LOG.info(execResult.getStdout());
+        LOG.info(execResult.getStderr());
+
+        runSql(
+                "INSERT INTO _tags2 SELECT tag_name, snapshot_id FROM 
T\\$tags;",
+                catalogDdl,
+                useCatalogCmd,
+                createResultSink("_tags2", "tag_name STRING, snapshot_id 
BIGINT"));
+        Thread.sleep(5000);
+        checkResult();
+    }
+
     private void runSql(String sql, String... ddls) throws Exception {
         runSql(String.join("\n", ddls) + "\n" + sql);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
index 5caf190a9..01e3a6728 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
@@ -130,6 +130,8 @@ public interface Action {
         private static final String DELETE = "delete";
         private static final String MERGE_INTO = "merge-into";
         private static final String ROLLBACK_TO = "rollback-to";
+        private static final String CREATE_TAG = "create-tag";
+        private static final String DELETE_TAG = "delete-tag";
         // cdc actions
         private static final String MYSQL_SYNC_TABLE = "mysql-sync-table";
         private static final String MYSQL_SYNC_DATABASE = 
"mysql-sync-database";
@@ -152,6 +154,10 @@ public interface Action {
                     return MergeIntoAction.create(actionArgs);
                 case ROLLBACK_TO:
                     return RollbackToAction.create(actionArgs);
+                case CREATE_TAG:
+                    return CreateTagAction.create(actionArgs);
+                case DELETE_TAG:
+                    return DeleteTagAction.create(actionArgs);
                 case MYSQL_SYNC_TABLE:
                     return MySqlSyncTableAction.create(actionArgs);
                 case MYSQL_SYNC_DATABASE:
@@ -176,6 +182,8 @@ public interface Action {
             System.out.println("  " + DELETE);
             System.out.println("  " + MERGE_INTO);
             System.out.println("  " + ROLLBACK_TO);
+            System.out.println("  " + CREATE_TAG);
+            System.out.println("  " + DELETE_TAG);
             System.out.println("  " + MYSQL_SYNC_TABLE);
             System.out.println("  " + MYSQL_SYNC_DATABASE);
             System.out.println("  " + KAFKA_SYNC_TABLE);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
new file mode 100644
index 000000000..39fbcb45e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.flink.action.Action.checkRequiredArgument;
+import static org.apache.paimon.flink.action.Action.getTablePath;
+import static org.apache.paimon.flink.action.Action.optionalConfigMap;
+
+/** Create tag action for Flink. */
+public class CreateTagAction extends TableActionBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(CreateTagAction.class);
+
+    private final String tagName;
+    private final long snapshotId;
+
+    public CreateTagAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig,
+            String tagName,
+            long snapshotId) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+        this.tagName = tagName;
+        this.snapshotId = snapshotId;
+    }
+
+    public static Optional<Action> create(String[] args) {
+        LOG.info("create-tag action args: {}", String.join(" ", args));
+
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+        if (params.has("help")) {
+            printHelp();
+            return Optional.empty();
+        }
+
+        checkRequiredArgument(params, "tag-name");
+        checkRequiredArgument(params, "snapshot");
+
+        Tuple3<String, String, String> tablePath = getTablePath(params);
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
+        String tagName = params.get("tag-name");
+        long snapshot = Long.parseLong(params.get("snapshot"));
+
+        CreateTagAction action =
+                new CreateTagAction(
+                        tablePath.f0, tablePath.f1, tablePath.f2, 
catalogConfig, tagName, snapshot);
+        return Optional.of(action);
+    }
+
+    private static void printHelp() {
+        System.out.println("Action \"create-tag\" creates a tag from given 
snapshot.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  create-tag --warehouse <warehouse-path> --database 
<database-name> "
+                        + "--table <table-name> --tag-name <tag-name> 
--snapshot <snapshot-id>");
+        System.out.println();
+    }
+
+    @Override
+    public void run() throws Exception {
+        table.createTag(tagName, snapshotId);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
new file mode 100644
index 000000000..6eb5a30db
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.flink.action.Action.checkRequiredArgument;
+import static org.apache.paimon.flink.action.Action.getTablePath;
+import static org.apache.paimon.flink.action.Action.optionalConfigMap;
+
+/** Delete tag action for Flink. */
+public class DeleteTagAction extends TableActionBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DeleteTagAction.class);
+
+    private final String tagName;
+
+    public DeleteTagAction(
+            String warehouse,
+            String databaseName,
+            String tableName,
+            Map<String, String> catalogConfig,
+            String tagName) {
+        super(warehouse, databaseName, tableName, catalogConfig);
+        this.tagName = tagName;
+    }
+
+    public static Optional<Action> create(String[] args) {
+        LOG.info("delete-tag action args: {}", String.join(" ", args));
+
+        MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+        if (params.has("help")) {
+            printHelp();
+            return Optional.empty();
+        }
+
+        checkRequiredArgument(params, "tag-name");
+
+        Tuple3<String, String, String> tablePath = getTablePath(params);
+        Map<String, String> catalogConfig = optionalConfigMap(params, 
"catalog-conf");
+        String tagName = params.get("tag-name");
+
+        DeleteTagAction action =
+                new DeleteTagAction(
+                        tablePath.f0, tablePath.f1, tablePath.f2, 
catalogConfig, tagName);
+        return Optional.of(action);
+    }
+
+    private static void printHelp() {
+        System.out.println("Action \"delete-tag\" deletes a tag by name.");
+        System.out.println();
+
+        System.out.println("Syntax:");
+        System.out.println(
+                "  delete-tag --warehouse <warehouse-path> --database 
<database-name> "
+                        + "--table <table-name> --tag-name <tag-name>");
+        System.out.println();
+    }
+
+    @Override
+    public void run() throws Exception {
+        table.deleteTag(tagName);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index e3ff399eb..2752765dc 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -470,4 +470,18 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                                         ","))
                 .collect(Collectors.toList());
     }
+
+    @Test
+    public void testTagsTable() throws Exception {
+        sql("CREATE TABLE T (a INT, b INT)");
+        sql("INSERT INTO T VALUES (1, 2)");
+        sql("INSERT INTO T VALUES (3, 4)");
+
+        paimonTable("T").createTag("tag1", 1);
+        paimonTable("T").createTag("tag2", 2);
+
+        List<Row> result = sql("SELECT tag_name, snapshot_id, schema_id, 
record_count FROM T$tags");
+
+        assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L), 
Row.of("tag2", 2L, 0L, 2L));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
new file mode 100644
index 000000000..d0f3e2879
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static 
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for tag management actions. */
+public class TagActionITCase extends ActionITCaseBase {
+
+    @Test
+    public void testCreateAndDeleteTag() throws Exception {
+        init(warehouse);
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.BIGINT(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType,
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyMap());
+
+        StreamWriteBuilder writeBuilder = 
table.newStreamWriteBuilder().withCommitUser(commitUser);
+        write = writeBuilder.newWrite();
+        commit = writeBuilder.newCommit();
+
+        // 3 snapshots
+        writeData(rowData(1L, BinaryString.fromString("Hi")));
+        writeData(rowData(2L, BinaryString.fromString("Hello")));
+        writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+        TagManager tagManager = new TagManager(table.fileIO(), 
table.location());
+
+        CreateTagAction createTagAction =
+                new CreateTagAction(
+                        warehouse, database, tableName, 
Collections.emptyMap(), "tag2", 2);
+        createTagAction.run();
+        assertThat(tagManager.tagExists("tag2")).isTrue();
+
+        // read tag2
+        testBatchRead(
+                "SELECT * FROM `" + tableName + "` /*+ 
OPTIONS('scan.tag-name'='tag2') */",
+                Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Hello")));
+
+        DeleteTagAction deleteTagAction =
+                new DeleteTagAction(warehouse, database, tableName, 
Collections.emptyMap(), "tag2");
+        deleteTagAction.run();
+        assertThat(tagManager.tagExists("tag2")).isFalse();
+    }
+}

Reply via email to