This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 877eb3099 [flink] Introduce create-tag and delete-tag actions (#1422)
877eb3099 is described below
commit 877eb3099a43f8c45b824059f79832dc1e3d280c
Author: yuzelin <[email protected]>
AuthorDate: Mon Jun 26 13:48:41 2023 +0800
[flink] Introduce create-tag and delete-tag actions (#1422)
---
.../java/org/apache/paimon/utils/TagManager.java | 10 ++-
.../apache/paimon/tests/FlinkActionsE2eTest.java | 82 +++++++++++++++++++
.../org/apache/paimon/flink/action/Action.java | 8 ++
.../paimon/flink/action/CreateTagAction.java | 92 ++++++++++++++++++++++
.../paimon/flink/action/DeleteTagAction.java | 87 ++++++++++++++++++++
.../apache/paimon/flink/CatalogTableITCase.java | 14 ++++
.../paimon/flink/action/TagActionITCase.java | 84 ++++++++++++++++++++
7 files changed, 373 insertions(+), 4 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
index f16871d43..4a4f13dba 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.SortedMap;
@@ -112,9 +113,8 @@ public class TagManager {
for (FileStatus status : statuses) {
Path path = status.getPath();
- if (path.getName().startsWith(TAG_PREFIX)) {
- tags.put(Snapshot.fromPath(fileIO, path), path.getName());
- }
+ tags.put(
+ Snapshot.fromPath(fileIO, path),
path.getName().substring(TAG_PREFIX.length()));
}
return tags;
}
@@ -135,7 +135,9 @@ public class TagManager {
tagDirectory));
}
- return statuses;
+ return Arrays.stream(statuses)
+ .filter(status ->
status.getPath().getName().startsWith(TAG_PREFIX))
+ .toArray(FileStatus[]::new);
} catch (IOException e) {
throw new RuntimeException(
String.format("Failed to list status in the '%s'
directory.", tagDirectory), e);
diff --git
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
index cd3e23e5f..deb8da838 100644
---
a/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
+++
b/paimon-e2e-tests/src/test/java/org/apache/paimon/tests/FlinkActionsE2eTest.java
@@ -318,6 +318,88 @@ public class FlinkActionsE2eTest extends E2eTestBase {
checkResult("1, Hi", "2, World");
}
+ @Test
+ public void testCreateAndDeleteTag() throws Exception {
+ String tableTDdl =
+ "CREATE TABLE IF NOT EXISTS T (\n"
+ + " k INT,\n"
+ + " v STRING,\n"
+ + " PRIMARY KEY (k) NOT ENFORCED\n"
+ + ");\n";
+
+ // 3 snapshots
+ String inserts =
+ "INSERT INTO T VALUES (1, 'Hi');\n"
+ + "INSERT INTO T VALUES (2, 'Hello');\n"
+ + "INSERT INTO T VALUES (3, 'Paimon');\n";
+
+ runSql("SET 'table.dml-sync' = 'true';\n" + inserts, catalogDdl,
useCatalogCmd, tableTDdl);
+
+ // create tag at snapshot 2 and check
+ Container.ExecResult execResult =
+ jobManager.execInContainer(
+ "bin/flink",
+ "run",
+ "lib/paimon-flink-action.jar",
+ "create-tag",
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ "--table",
+ "T",
+ "--tag-name",
+ "tag2",
+ "--snapshot",
+ "2");
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+
+ runSql(
+ "INSERT INTO _tags1 SELECT tag_name, snapshot_id FROM
T\\$tags;",
+ catalogDdl,
+ useCatalogCmd,
+ createResultSink("_tags1", "tag_name STRING, snapshot_id
BIGINT"));
+ checkResult("tag2, 2");
+ clearCurrentResults();
+
+ // read tag2
+ runSql(
+ "SET 'execution.runtime-mode' = 'batch';\n"
+ + "INSERT INTO result1 SELECT * FROM T /*+
OPTIONS('scan.tag-name'='tag2') */;",
+ catalogDdl,
+ useCatalogCmd,
+ createResultSink("result1", "k INT, v STRING"));
+ checkResult("1, Hi", "2, Hello");
+ clearCurrentResults();
+
+ // delete tag2 and check
+ execResult =
+ jobManager.execInContainer(
+ "bin/flink",
+ "run",
+ "lib/paimon-flink-action.jar",
+ "delete-tag",
+ "--warehouse",
+ warehousePath,
+ "--database",
+ "default",
+ "--table",
+ "T",
+ "--tag-name",
+ "tag2");
+ LOG.info(execResult.getStdout());
+ LOG.info(execResult.getStderr());
+
+ runSql(
+ "INSERT INTO _tags2 SELECT tag_name, snapshot_id FROM
T\\$tags;",
+ catalogDdl,
+ useCatalogCmd,
+ createResultSink("_tags2", "tag_name STRING, snapshot_id
BIGINT"));
+ Thread.sleep(5000);
+ checkResult();
+ }
+
private void runSql(String sql, String... ddls) throws Exception {
runSql(String.join("\n", ddls) + "\n" + sql);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
index 5caf190a9..01e3a6728 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/Action.java
@@ -130,6 +130,8 @@ public interface Action {
private static final String DELETE = "delete";
private static final String MERGE_INTO = "merge-into";
private static final String ROLLBACK_TO = "rollback-to";
+ private static final String CREATE_TAG = "create-tag";
+ private static final String DELETE_TAG = "delete-tag";
// cdc actions
private static final String MYSQL_SYNC_TABLE = "mysql-sync-table";
private static final String MYSQL_SYNC_DATABASE =
"mysql-sync-database";
@@ -152,6 +154,10 @@ public interface Action {
return MergeIntoAction.create(actionArgs);
case ROLLBACK_TO:
return RollbackToAction.create(actionArgs);
+ case CREATE_TAG:
+ return CreateTagAction.create(actionArgs);
+ case DELETE_TAG:
+ return DeleteTagAction.create(actionArgs);
case MYSQL_SYNC_TABLE:
return MySqlSyncTableAction.create(actionArgs);
case MYSQL_SYNC_DATABASE:
@@ -176,6 +182,8 @@ public interface Action {
System.out.println(" " + DELETE);
System.out.println(" " + MERGE_INTO);
System.out.println(" " + ROLLBACK_TO);
+ System.out.println(" " + CREATE_TAG);
+ System.out.println(" " + DELETE_TAG);
System.out.println(" " + MYSQL_SYNC_TABLE);
System.out.println(" " + MYSQL_SYNC_DATABASE);
System.out.println(" " + KAFKA_SYNC_TABLE);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
new file mode 100644
index 000000000..39fbcb45e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CreateTagAction.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.flink.action.Action.checkRequiredArgument;
+import static org.apache.paimon.flink.action.Action.getTablePath;
+import static org.apache.paimon.flink.action.Action.optionalConfigMap;
+
+/** Create tag action for Flink. */
+public class CreateTagAction extends TableActionBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CreateTagAction.class);
+
+ private final String tagName;
+ private final long snapshotId;
+
+ public CreateTagAction(
+ String warehouse,
+ String databaseName,
+ String tableName,
+ Map<String, String> catalogConfig,
+ String tagName,
+ long snapshotId) {
+ super(warehouse, databaseName, tableName, catalogConfig);
+ this.tagName = tagName;
+ this.snapshotId = snapshotId;
+ }
+
+ public static Optional<Action> create(String[] args) {
+ LOG.info("create-tag action args: {}", String.join(" ", args));
+
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+ if (params.has("help")) {
+ printHelp();
+ return Optional.empty();
+ }
+
+ checkRequiredArgument(params, "tag-name");
+ checkRequiredArgument(params, "snapshot");
+
+ Tuple3<String, String, String> tablePath = getTablePath(params);
+ Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
+ String tagName = params.get("tag-name");
+ long snapshot = Long.parseLong(params.get("snapshot"));
+
+ CreateTagAction action =
+ new CreateTagAction(
+ tablePath.f0, tablePath.f1, tablePath.f2,
catalogConfig, tagName, snapshot);
+ return Optional.of(action);
+ }
+
+ private static void printHelp() {
+ System.out.println("Action \"create-tag\" creates a tag from given
snapshot.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " create-tag --warehouse <warehouse-path> --database
<database-name> "
+ + "--table <table-name> --tag-name <tag-name>
--snapshot <snapshot-id>");
+ System.out.println();
+ }
+
+ @Override
+ public void run() throws Exception {
+ table.createTag(tagName, snapshotId);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
new file mode 100644
index 000000000..6eb5a30db
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DeleteTagAction.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.paimon.flink.action.Action.checkRequiredArgument;
+import static org.apache.paimon.flink.action.Action.getTablePath;
+import static org.apache.paimon.flink.action.Action.optionalConfigMap;
+
+/** Delete tag action for Flink. */
+public class DeleteTagAction extends TableActionBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DeleteTagAction.class);
+
+ private final String tagName;
+
+ public DeleteTagAction(
+ String warehouse,
+ String databaseName,
+ String tableName,
+ Map<String, String> catalogConfig,
+ String tagName) {
+ super(warehouse, databaseName, tableName, catalogConfig);
+ this.tagName = tagName;
+ }
+
+ public static Optional<Action> create(String[] args) {
+ LOG.info("delete-tag action args: {}", String.join(" ", args));
+
+ MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
+
+ if (params.has("help")) {
+ printHelp();
+ return Optional.empty();
+ }
+
+ checkRequiredArgument(params, "tag-name");
+
+ Tuple3<String, String, String> tablePath = getTablePath(params);
+ Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
+ String tagName = params.get("tag-name");
+
+ DeleteTagAction action =
+ new DeleteTagAction(
+ tablePath.f0, tablePath.f1, tablePath.f2,
catalogConfig, tagName);
+ return Optional.of(action);
+ }
+
+ private static void printHelp() {
+ System.out.println("Action \"delete-tag\" deletes a tag by name.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " delete-tag --warehouse <warehouse-path> --database
<database-name> "
+ + "--table <table-name> --tag-name <tag-name>");
+ System.out.println();
+ }
+
+ @Override
+ public void run() throws Exception {
+ table.deleteTag(tagName);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index e3ff399eb..2752765dc 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -470,4 +470,18 @@ public class CatalogTableITCase extends CatalogITCaseBase {
","))
.collect(Collectors.toList());
}
+
+ @Test
+ public void testTagsTable() throws Exception {
+ sql("CREATE TABLE T (a INT, b INT)");
+ sql("INSERT INTO T VALUES (1, 2)");
+ sql("INSERT INTO T VALUES (3, 4)");
+
+ paimonTable("T").createTag("tag1", 1);
+ paimonTable("T").createTag("tag2", 2);
+
+ List<Row> result = sql("SELECT tag_name, snapshot_id, schema_id,
record_count FROM T$tags");
+
+ assertThat(result).containsExactly(Row.of("tag1", 1L, 0L, 1L),
Row.of("tag2", 2L, 0L, 2L));
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
new file mode 100644
index 000000000..d0f3e2879
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TagManager;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
+import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for tag management actions. */
+public class TagActionITCase extends ActionITCaseBase {
+
+ @Test
+ public void testCreateAndDeleteTag() throws Exception {
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("k"),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ TagManager tagManager = new TagManager(table.fileIO(),
table.location());
+
+ CreateTagAction createTagAction =
+ new CreateTagAction(
+ warehouse, database, tableName,
Collections.emptyMap(), "tag2", 2);
+ createTagAction.run();
+ assertThat(tagManager.tagExists("tag2")).isTrue();
+
+ // read tag2
+ testBatchRead(
+ "SELECT * FROM `" + tableName + "` /*+
OPTIONS('scan.tag-name'='tag2') */",
+ Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Hello")));
+
+ DeleteTagAction deleteTagAction =
+ new DeleteTagAction(warehouse, database, tableName,
Collections.emptyMap(), "tag2");
+ deleteTagAction.run();
+ assertThat(tagManager.tagExists("tag2")).isFalse();
+ }
+}