This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 92ab2ed73 [flink] Transform actions to procedures (#2027)
92ab2ed73 is described below
commit 92ab2ed73173221ce4be1ebafb15213de59e5474
Author: yuzelin <[email protected]>
AuthorDate: Wed Sep 20 21:11:41 2023 +0800
[flink] Transform actions to procedures (#2027)
---
.../flink/action/MergeIntoActionFactory.java | 18 +-
.../paimon/flink/action/TableActionBase.java | 4 -
.../flink/procedure/CompactDatabaseProcedure.java | 121 +++++++++++++
.../paimon/flink/procedure/CompactProcedure.java | 23 +--
.../paimon/flink/procedure/CreateTagProcedure.java | 50 ++++++
.../paimon/flink/procedure/DeleteTagProcedure.java | 49 ++++++
.../flink/procedure/DropPartitionProcedure.java | 61 +++++++
.../paimon/flink/procedure/MergeIntoProcedure.java | 192 +++++++++++++++++++++
.../paimon/flink/procedure/ProcedureBase.java | 29 +++-
.../paimon/flink/procedure/ProcedureUtil.java | 26 ++-
.../flink/procedure/ResetConsumerProcedure.java | 58 +++++++
.../flink/procedure/RollbackToProcedure.java | 61 +++++++
.../paimon/flink/action/ActionITCaseBase.java | 8 +-
.../flink/action/CompactDatabaseActionITCase.java | 169 +++++++++---------
.../paimon/flink/action/ConsumerActionITCase.java | 14 +-
.../flink/action/DropPartitionActionITCase.java | 42 +++--
.../paimon/flink/action/MergeIntoActionITCase.java | 90 +++++++++-
.../flink/action/RollbackToActionITCase.java | 9 +-
.../paimon/flink/action/TagActionITCase.java | 21 ++-
19 files changed, 891 insertions(+), 154 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
index 5fae57222..27fe6069b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoActionFactory.java
@@ -33,6 +33,12 @@ public class MergeIntoActionFactory implements ActionFactory
{
public static final String IDENTIFIER = "merge-into";
+ public static final String MATCHED_UPSERT = "matched-upsert";
+ public static final String NOT_MATCHED_BY_SOURCE_UPSERT =
"not-matched-by-source-upsert";
+ public static final String MATCHED_DELETE = "matched-delete";
+ public static final String NOT_MATCHED_BY_SOURCE_DELETE =
"not-matched-by-source-delete";
+ public static final String NOT_MATCHED_INSERT = "not-matched-insert";
+
@Override
public String identifier() {
return IDENTIFIER;
@@ -66,25 +72,25 @@ public class MergeIntoActionFactory implements
ActionFactory {
Arrays.stream(params.get("merge-actions").split(","))
.map(String::trim)
.collect(Collectors.toList());
- if (actions.contains("matched-upsert")) {
+ if (actions.contains(MATCHED_UPSERT)) {
checkRequiredArgument(params, "matched-upsert-set");
action.withMatchedUpsert(
params.get("matched-upsert-condition"),
params.get("matched-upsert-set"));
}
- if (actions.contains("not-matched-by-source-upsert")) {
+ if (actions.contains(NOT_MATCHED_BY_SOURCE_UPSERT)) {
checkRequiredArgument(params, "not-matched-by-source-upsert-set");
action.withNotMatchedBySourceUpsert(
params.get("not-matched-by-source-upsert-condition"),
params.get("not-matched-by-source-upsert-set"));
}
- if (actions.contains("matched-delete")) {
+ if (actions.contains(MATCHED_DELETE)) {
action.withMatchedDelete(params.get("matched-delete-condition"));
}
- if (actions.contains("not-matched-by-source-delete")) {
+ if (actions.contains(NOT_MATCHED_BY_SOURCE_DELETE)) {
action.withNotMatchedBySourceDelete(
params.get("not-matched-by-source-delete-condition"));
}
- if (actions.contains("not-matched-insert")) {
+ if (actions.contains(NOT_MATCHED_INSERT)) {
checkRequiredArgument(params, "not-matched-insert-values");
action.withNotMatchedInsert(
params.get("not-matched-insert-condition"),
@@ -182,7 +188,7 @@ public class MergeIntoActionFactory implements
ActionFactory {
" It will find matched rows of target table that meet
condition (T.k = S.k), then update T.v with S.v where (T.v <> S.v).");
}
- private void validate(MergeIntoAction action) {
+ public static void validate(MergeIntoAction action) {
if (!action.matchedUpsert
&& !action.notMatchedUpsert
&& !action.matchedDelete
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 02b8e1943..5095cc352 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -30,8 +30,6 @@ import org.apache.paimon.utils.Preconditions;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.data.RowData;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
@@ -41,8 +39,6 @@ import java.util.Map;
/** Abstract base of {@link Action} for table. */
public abstract class TableActionBase extends ActionBase {
- private static final Logger LOG =
LoggerFactory.getLogger(TableActionBase.class);
-
protected Table table;
protected final Identifier identifier;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
new file mode 100644
index 000000000..10b06931a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactDatabaseProcedure.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.CompactDatabaseAction;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Map;
+
+import static
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
+
+/**
+ * Compact database procedure. Usage:
+ *
+ * <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * -- compact all databases
+ * CALL compact_database()
+ *
+ * -- compact some databases (accept regular expression)
+ * CALL compact_database('includingDatabases')
+ *
+ * -- set compact mode
+ * CALL compact_database('includingDatabases', 'mode')
+ *
+ * -- compact some tables (accept regular expression)
+ * CALL compact_database('includingDatabases', 'mode', 'includingTables')
+ *
+ * -- exclude some tables (accept regular expression)
+ * CALL compact_database('includingDatabases', 'mode', 'includingTables',
'excludingTables')
+ *
+ * -- set table options ('k=v,...')
+ * CALL compact_database('includingDatabases', 'mode', 'includingTables',
'excludingTables', 'tableOptions')
+ * </code></pre>
+ */
+public class CompactDatabaseProcedure extends ProcedureBase {
+
+ public static final String NAME = "compact_database";
+
+ public CompactDatabaseProcedure(Catalog catalog) {
+ super(catalog);
+ }
+
+ public String[] call(ProcedureContext procedureContext) throws Exception {
+ return call(procedureContext, "");
+ }
+
+ public String[] call(ProcedureContext procedureContext, String
includingDatabases)
+ throws Exception {
+ return call(procedureContext, includingDatabases, "");
+ }
+
+ public String[] call(ProcedureContext procedureContext, String
includingDatabases, String mode)
+ throws Exception {
+ return call(procedureContext, includingDatabases, mode, "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String includingDatabases,
+ String mode,
+ String includingTables)
+ throws Exception {
+ return call(procedureContext, includingDatabases, mode,
includingTables, "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String includingDatabases,
+ String mode,
+ String includingTables,
+ String excludingTables)
+ throws Exception {
+ return call(
+ procedureContext, includingDatabases, mode, includingTables,
excludingTables, "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String includingDatabases,
+ String mode,
+ String includingTables,
+ String excludingTables,
+ String tableOptions)
+ throws Exception {
+ String warehouse = ((AbstractCatalog) catalog).warehouse();
+ Map<String, String> catalogOptions = ((AbstractCatalog)
catalog).options();
+ CompactDatabaseAction action =
+ new CompactDatabaseAction(warehouse, catalogOptions)
+ .includingDatabases(nullable(includingDatabases))
+ .includingTables(nullable(includingTables))
+ .excludingTables(nullable(excludingTables))
+ .withDatabaseCompactMode(nullable(mode));
+ if (!StringUtils.isBlank(tableOptions)) {
+
action.withTableOptions(parseCommaSeparatedKeyValues(tableOptions));
+ }
+
+ return execute(procedureContext, action, "Compact database job");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
index cb8aa3162..e4970e6f8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -24,32 +24,30 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.CompactAction;
import org.apache.paimon.flink.action.SortCompactAction;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.procedure.ProcedureContext;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
-import static
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
-
/**
* Compact procedure. Usage:
*
* <pre><code>
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
* -- compact a table (tableId should be 'database_name.table_name')
* CALL compact('tableId')
*
* -- compact a table with sorting
- * CALL compact('tableId', 'order-strategy', 'order-by-columns')
+ * CALL compact('tableId', 'orderStrategy', 'orderByColumns')
*
* -- compact specific partitions ('pt1=A,pt2=a', 'pt1=B,pt2=b', ...)
- * -- NOTE: if you don't need sorting but you want specify partitions, use ''
as placeholder
* CALL compact('tableId', '', '', partition1, partition2, ...)
* </code></pre>
*/
public class CompactProcedure extends ProcedureBase {
+ public static final String NAME = "compact";
+
public CompactProcedure(Catalog catalog) {
super(catalog);
}
@@ -103,16 +101,9 @@ public class CompactProcedure extends ProcedureBase {
}
if (partitionStrings.length != 0) {
- List<Map<String, String>> partitions = new ArrayList<>();
- for (String partition : partitionStrings) {
- partitions.add(parseCommaSeparatedKeyValues(partition));
- }
- action.withPartitions(partitions);
+ action.withPartitions(getPartitions(partitionStrings));
}
- StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
- action.build(env);
-
- return execute(env, jobName);
+ return execute(procedureContext, action, jobName);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
new file mode 100644
index 000000000..0a672046d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CreateTagProcedure.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Create tag procedure. Usage:
+ *
+ * <pre><code>
+ * CALL create_tag('tableId', 'tagName', snapshotId)
+ * </code></pre>
+ */
+public class CreateTagProcedure extends ProcedureBase {
+
+ public static final String NAME = "create_tag";
+
+ public CreateTagProcedure(Catalog catalog) {
+ super(catalog);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, String tagName,
long snapshotId)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.createTag(tagName, snapshotId);
+
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
new file mode 100644
index 000000000..83a99e7a3
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DeleteTagProcedure.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Delete tag procedure. Usage:
+ *
+ * <pre><code>
+ * CALL delete_tag('tableId', 'tagName')
+ * </code></pre>
+ */
+public class DeleteTagProcedure extends ProcedureBase {
+
+ public static final String NAME = "delete_tag";
+
+ public DeleteTagProcedure(Catalog catalog) {
+ super(catalog);
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
String tagName)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.deleteTag(tagName);
+
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
new file mode 100644
index 000000000..26d8f2919
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.operation.FileStoreCommit;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.UUID;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * Drop partition procedure. Usage:
+ *
+ * <pre><code>
+ * CALL drop_partition('tableId', 'partition1', 'partition2', ...)
+ * </code></pre>
+ */
+public class DropPartitionProcedure extends ProcedureBase {
+
+ public static final String NAME = "drop_partition";
+
+ public DropPartitionProcedure(Catalog catalog) {
+ super(catalog);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, String...
partitionStrings)
+ throws Catalog.TableNotExistException {
+ checkArgument(
+ partitionStrings.length > 0, "drop-partition procedure must
specify partitions.");
+
+ FileStoreTable fileStoreTable =
+ (FileStoreTable)
catalog.getTable(Identifier.fromString(tableId));
+ FileStoreCommit commit =
fileStoreTable.store().newCommit(UUID.randomUUID().toString());
+ commit.dropPartitions(getPartitions(partitionStrings),
BatchWriteBuilder.COMMIT_IDENTIFIER);
+
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
new file mode 100644
index 000000000..6563595e6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MergeIntoProcedure.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.MergeIntoAction;
+import org.apache.paimon.flink.action.MergeIntoActionFactory;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+
+/**
+ * Merge Into procedure. Usage:
+ *
+ * <pre><code>
+ * CALL merge_into(
+ * 'targetTableId',
+ * 'targetAlias',
+ * 'sourceSqls', -- separate with ';'
+ * 'sourceTable',
+ * 'mergeCondition',
+ * 'mergeActions',
+ * // arguments for merge actions
+ * ...
+ * )
+ *
+ * -- merge actions and corresponding arguments
+ * matched-upsert: condition, set
+ * not-matched-by-source-upsert: condition, set
+ * matched-delete: condition
+ * not-matched-by-source-delete: condition
+ * not-matched-insert: condition, insertValues
+ *
+ * -- NOTE: the arguments should be in the order of merge actions
+ * -- and use '' as placeholder for optional arguments
+ * </code></pre>
+ *
+ * <p>This procedure will be forced to use batch environments
+ */
+public class MergeIntoProcedure extends ProcedureBase {
+
+ public static final String NAME = "merge_into";
+
+ public MergeIntoProcedure(Catalog catalog) {
+ super(catalog);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String targetTableId,
+ String targetAlias,
+ String sourceSqls,
+ String sourceTable,
+ String mergeCondition,
+ String mergeActions,
+ String... mergeActionArguments)
+ throws Exception {
+ String warehouse = ((AbstractCatalog) catalog).warehouse();
+ Map<String, String> catalogOptions = ((AbstractCatalog)
catalog).options();
+ Identifier identifier = Identifier.fromString(targetTableId);
+ MergeIntoAction action =
+ new MergeIntoAction(
+ warehouse,
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ catalogOptions);
+ action.withTargetAlias(nullable(targetAlias));
+
+ if (!sourceSqls.isEmpty()) {
+ action.withSourceSqls(sourceSqls.split(";"));
+ }
+
+ checkArgument(!sourceTable.isEmpty(), "Must specify source table.");
+ action.withSourceTable(sourceTable);
+
+ checkArgument(!mergeCondition.isEmpty(), "Must specify merge
condition.");
+ action.withMergeCondition(mergeCondition);
+
+ checkArgument(!mergeActions.isEmpty(), "Must specify at least one
merge action.");
+ List<String> actions =
+ Arrays.stream(mergeActions.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ validateActions(actions, mergeActionArguments.length);
+
+ int index = 0;
+ String condition, setting;
+ for (String mergeAction : actions) {
+ switch (mergeAction) {
+ case MATCHED_UPSERT:
+ case NOT_MATCHED_BY_SOURCE_UPSERT:
+ case NOT_MATCHED_INSERT:
+ condition = nullable(mergeActionArguments[index++]);
+ setting = nullable(mergeActionArguments[index++]);
+ checkNotNull(setting, "%s must set the second argument",
mergeAction);
+ setMergeAction(action, mergeAction, condition, setting);
+ break;
+ case MATCHED_DELETE:
+ case NOT_MATCHED_BY_SOURCE_DELETE:
+ condition = nullable(mergeActionArguments[index++]);
+ setMergeAction(action, mergeAction, condition);
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown merge
action: " + action);
+ }
+ }
+
+ MergeIntoActionFactory.validate(action);
+
+ // TODO set dml-sync argument to action
+ action.run();
+
+ return new String[] {"Success"};
+ }
+
+ private void validateActions(List<String> mergeActions, int
argumentLength) {
+ int expectedArguments = 0;
+ for (String action : mergeActions) {
+ switch (action) {
+ case MATCHED_UPSERT:
+ case NOT_MATCHED_BY_SOURCE_UPSERT:
+ case NOT_MATCHED_INSERT:
+ expectedArguments += 2;
+ break;
+ case MATCHED_DELETE:
+ case NOT_MATCHED_BY_SOURCE_DELETE:
+ expectedArguments += 1;
+ break;
+ default:
+ throw new UnsupportedOperationException("Unknown merge
action: " + action);
+ }
+ }
+
+ checkArgument(
+ expectedArguments == argumentLength,
+ "Expected %s action arguments but given '%s'",
+ expectedArguments,
+ argumentLength);
+ }
+
+ private void setMergeAction(MergeIntoAction action, String mergeAction,
String... arguments) {
+ switch (mergeAction) {
+ case MATCHED_UPSERT:
+ action.withMatchedUpsert(arguments[0], arguments[1]);
+ return;
+ case NOT_MATCHED_BY_SOURCE_UPSERT:
+ action.withNotMatchedBySourceUpsert(arguments[0],
arguments[1]);
+ return;
+ case NOT_MATCHED_INSERT:
+ action.withNotMatchedInsert(arguments[0], arguments[1]);
+ return;
+ case MATCHED_DELETE:
+ action.withMatchedDelete(arguments[0]);
+ return;
+ case NOT_MATCHED_BY_SOURCE_DELETE:
+ action.withNotMatchedBySourceDelete(arguments[0]);
+ return;
+ default:
+ throw new UnsupportedOperationException("Unknown merge action:
" + mergeAction);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index c44fe2701..1faa6445e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -19,16 +19,26 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+import org.apache.paimon.utils.StringUtils;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.table.procedures.Procedure;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+import static
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
/** Base implementation for flink {@link Procedure}. */
public class ProcedureBase implements Procedure {
@@ -39,8 +49,25 @@ public class ProcedureBase implements Procedure {
this.catalog = catalog;
}
- protected String[] execute(StreamExecutionEnvironment env, String
defaultJobName)
+ protected List<Map<String, String>> getPartitions(String...
partitionStrings) {
+ List<Map<String, String>> partitions = new ArrayList<>();
+ for (String partition : partitionStrings) {
+ partitions.add(parseCommaSeparatedKeyValues(partition));
+ }
+ return partitions;
+ }
+
+ @Nullable
+ protected String nullable(String arg) {
+ return StringUtils.isBlank(arg) ? null : arg;
+ }
+
+ protected String[] execute(
+ ProcedureContext procedureContext, Action action, String
defaultJobName)
throws Exception {
+ StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
+ action.build(env);
+
ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
String name =
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
JobClient jobClient = env.executeAsync(name);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
index e90450afc..98fbcda5f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -19,7 +19,6 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.flink.action.CompactActionFactory;
import org.apache.flink.table.procedures.Procedure;
@@ -36,7 +35,14 @@ public class ProcedureUtil {
private static final List<String> SYSTEM_PROCEDURES = new ArrayList<>();
static {
- SYSTEM_PROCEDURES.add(CompactActionFactory.IDENTIFIER);
+ SYSTEM_PROCEDURES.add(CompactProcedure.NAME);
+ SYSTEM_PROCEDURES.add(CompactDatabaseProcedure.NAME);
+ SYSTEM_PROCEDURES.add(CreateTagProcedure.NAME);
+ SYSTEM_PROCEDURES.add(DeleteTagProcedure.NAME);
+ SYSTEM_PROCEDURES.add(DropPartitionProcedure.NAME);
+ SYSTEM_PROCEDURES.add(MergeIntoProcedure.NAME);
+ SYSTEM_PROCEDURES.add(ResetConsumerProcedure.NAME);
+ SYSTEM_PROCEDURES.add(RollbackToProcedure.NAME);
}
public static List<String> listProcedures() {
@@ -45,8 +51,22 @@ public class ProcedureUtil {
public static Optional<Procedure> getProcedure(Catalog catalog, String
procedureName) {
switch (procedureName) {
- case CompactActionFactory.IDENTIFIER:
+ case CompactProcedure.NAME:
return Optional.of(new CompactProcedure(catalog));
+ case CompactDatabaseProcedure.NAME:
+ return Optional.of(new CompactDatabaseProcedure(catalog));
+ case CreateTagProcedure.NAME:
+ return Optional.of(new CreateTagProcedure(catalog));
+ case DeleteTagProcedure.NAME:
+ return Optional.of(new DeleteTagProcedure(catalog));
+ case DropPartitionProcedure.NAME:
+ return Optional.of(new DropPartitionProcedure(catalog));
+ case MergeIntoProcedure.NAME:
+ return Optional.of(new MergeIntoProcedure(catalog));
+ case ResetConsumerProcedure.NAME:
+ return Optional.of(new ResetConsumerProcedure(catalog));
+ case RollbackToProcedure.NAME:
+ return Optional.of(new RollbackToProcedure(catalog));
default:
return Optional.empty();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
new file mode 100644
index 000000000..70a409b0b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ResetConsumerProcedure.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.consumer.Consumer;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Drop partition procedure. Usage:
+ *
+ * <pre><code>
+ * CALL reset_consumer('tableId', 'consumerId', nextSnapshotId)
+ * </code></pre>
+ */
+public class ResetConsumerProcedure extends ProcedureBase {
+
+ public static final String NAME = "reset_consumer";
+
+ public ResetConsumerProcedure(Catalog catalog) {
+ super(catalog);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String consumerId,
+ long nextSnapshotId)
+ throws Catalog.TableNotExistException {
+ FileStoreTable fileStoreTable =
+ (FileStoreTable)
catalog.getTable(Identifier.fromString(tableId));
+ ConsumerManager consumerManager =
+ new ConsumerManager(fileStoreTable.fileIO(),
fileStoreTable.location());
+ consumerManager.resetConsumer(consumerId, new
Consumer(nextSnapshotId));
+
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
new file mode 100644
index 000000000..b8d6dd38c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RollbackToProcedure.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Drop partition procedure. Usage:
+ *
+ * <pre><code>
+ * -- rollback to a snapshot
+ * CALL rollback_to('tableId', snapshotId)
+ *
+ * -- rollback to a tag
+ * CALL rollback_to('tableId', 'tagName')
+ * </code></pre>
+ */
+public class RollbackToProcedure extends ProcedureBase {
+
+ public static final String NAME = "rollback_to";
+
+ public RollbackToProcedure(Catalog catalog) {
+ super(catalog);
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
long snapshotId)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.rollbackTo(snapshotId);
+
+ return new String[] {"Success"};
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
String tagName)
+ throws Catalog.TableNotExistException {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ table.rollbackTo(tagName);
+
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index 0e71d076f..fe48881d4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -54,6 +54,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
/** {@link Action} test base. */
public abstract class ActionITCaseBase extends AbstractTestBase {
@@ -145,7 +146,7 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.setParallelism(2);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
if (isStreaming) {
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
@@ -158,6 +159,11 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
return env;
}
+ protected void callProcedure(String procedureStatement) {
+ // default execution mode
+ callProcedure(procedureStatement, true, false);
+ }
+
protected void callProcedure(String procedureStatement, boolean
isStreaming, boolean dmlSync) {
StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index ce2e72641..492d98434 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -38,15 +38,14 @@ import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.SnapshotManager;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import javax.annotation.Nullable;
+
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
@@ -95,8 +94,6 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.WRITE_ONLY.key(), "true");
- Map<String, String> tableOptions = new HashMap<>();
-
List<FileStoreTable> tables = new ArrayList<>();
for (String dbName : DATABASE_NAMES) {
@@ -131,27 +128,16 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
}
}
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
- if (mode.equals("divided")) {
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .includingDatabases(null)
- .includingTables(null)
- .excludingTables(null)
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ StreamExecutionEnvironment env = buildDefaultEnv(false);
+ new CompactDatabaseAction(warehouse, Collections.emptyMap())
+ .withDatabaseCompactMode(mode)
.build(env);
+ env.execute();
} else {
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .includingDatabases(null)
- .includingTables(null)
- .excludingTables(null)
- .withDatabaseCompactMode("combined")
- .withTableOptions(tableOptions)
- .build(env);
+ callProcedure(String.format("CALL compact_database('', '%s')",
mode), false, true);
}
- env.execute();
-
for (FileStoreTable table : tables) {
SnapshotManager snapshotManager = table.snapshotManager();
Snapshot snapshot =
@@ -181,11 +167,6 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
- Map<String, String> tableOptions = new HashMap<>();
- // if CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() use default
value, the cost time in
- // combined mode will be over 1mins
- tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
-
List<FileStoreTable> tables = new ArrayList<>();
for (String dbName : DATABASE_NAMES) {
for (String tableName : TABLE_NAMES) {
@@ -220,27 +201,31 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
}
}
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointInterval(500);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
- if (mode.equals("divided")) {
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .includingDatabases(null)
- .includingTables(null)
- .excludingTables(null)
- .build(env);
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ StreamExecutionEnvironment env = buildDefaultEnv(true);
+ if (mode.equals("divided")) {
+ new CompactDatabaseAction(warehouse, new
HashMap<>()).build(env);
+ } else {
+ // if CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key() use
default value, the cost
+ // time in combined mode will be over 1 min
+ new CompactDatabaseAction(warehouse, new HashMap<>())
+ .withDatabaseCompactMode("combined")
+ .withTableOptions(
+ Collections.singletonMap(
+
CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"))
+ .build(env);
+ }
+ env.executeAsync();
} else {
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .includingDatabases(null)
- .includingTables(null)
- .excludingTables(null)
- .withDatabaseCompactMode("combined")
- .withTableOptions(tableOptions)
- .build(env);
+ if (mode.equals("divided")) {
+ callProcedure("CALL compact_database()", true, false);
+ } else {
+ callProcedure(
+ "CALL compact_database('', 'combined', '', '',
'continuous.discovery-interval=1s')",
+ true,
+ false);
+ }
}
- JobClient client = env.executeAsync();
for (FileStoreTable table : tables) {
StreamTableScan scan = table.newReadBuilder().newStreamScan();
@@ -381,8 +366,6 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
"Cannot validate snapshot expiration in %s
milliseconds.", 60_000));
}
}
-
- client.cancel();
}
@ParameterizedTest(name = "mode = {0}")
@@ -437,9 +420,6 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.WRITE_ONLY.key(), "true");
- Map<String, String> tableOptions = new HashMap<>();
- tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
-
List<FileStoreTable> compactionTables = new ArrayList<>();
List<FileStoreTable> noCompactionTables = new ArrayList<>();
@@ -480,26 +460,37 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
}
}
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
-
- if (mode.equals("divided")) {
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .includingDatabases(null)
- .includingTables(includingPattern)
- .excludingTables(excludesPattern)
- .build(env);
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ StreamExecutionEnvironment env = buildDefaultEnv(false);
+ CompactDatabaseAction action =
+ new CompactDatabaseAction(warehouse,
Collections.emptyMap())
+ .includingTables(includingPattern)
+ .excludingTables(excludesPattern)
+ .withDatabaseCompactMode(mode);
+ if (mode.equals("combined")) {
+ action.withTableOptions(
+ Collections.singletonMap(
+
CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s"));
+ }
+ action.build(env);
+ env.execute();
} else {
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .includingDatabases(null)
- .includingTables(includingPattern)
- .excludingTables(excludesPattern)
- .withDatabaseCompactMode("combined")
- .withTableOptions(tableOptions)
- .build(env);
+ if (mode.equals("divided")) {
+ callProcedure(
+ String.format(
+ "CALL compact_database('', 'divided', '%s',
'%s')",
+ nonNull(includingPattern),
nonNull(excludesPattern)),
+ false,
+ true);
+ } else {
+ callProcedure(
+ String.format(
+ "CALL compact_database('', 'combined', '%s',
'%s', 'continuous.discovery-interval=1s')",
+ nonNull(includingPattern),
nonNull(excludesPattern)),
+ false,
+ true);
+ }
}
- env.execute();
for (FileStoreTable table : compactionTables) {
SnapshotManager snapshotManager = table.snapshotManager();
@@ -532,6 +523,10 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
}
}
+ private String nonNull(@Nullable String s) {
+ return s == null ? "" : s;
+ }
+
@Test
public void testUnawareBucketStreamingCompact() throws Exception {
Map<String, String> options = new HashMap<>();
@@ -573,17 +568,13 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
}
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointInterval(500);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .includingDatabases(null)
- .includingTables(null)
- .excludingTables(null)
- .build(env);
- JobClient client = env.executeAsync();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ StreamExecutionEnvironment env = buildDefaultEnv(true);
+ new CompactDatabaseAction(warehouse, new HashMap<>()).build(env);
+ env.executeAsync();
+ } else {
+ callProcedure("CALL compact_database()");
+ }
for (FileStoreTable table : tables) {
StreamWriteBuilder streamWriteBuilder =
@@ -602,8 +593,6 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
// second compaction, snapshot will be 5
checkFileAndRowSize(table, 5L, 30_000L, 1, 9);
}
-
- client.cancel().get();
}
@Test
@@ -646,15 +635,13 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
}
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
- new CompactDatabaseAction(warehouse, new HashMap<>())
- .includingDatabases(null)
- .includingTables(null)
- .excludingTables(null)
- .build(env);
- env.execute();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ StreamExecutionEnvironment env = buildDefaultEnv(false);
+ new CompactDatabaseAction(warehouse, new HashMap<>()).build(env);
+ env.execute();
+ } else {
+ callProcedure("CALL compact_database()", false, true);
+ }
for (FileStoreTable table : tables) {
// first compaction, snapshot will be 3.
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index e33b3f42d..444d2f71e 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -32,6 +32,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
+import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
@@ -80,11 +81,14 @@ public class ConsumerActionITCase extends ActionITCaseBase {
assertThat(consumer1.get().nextSnapshot()).isEqualTo(4);
// reset consumer
- ResetConsumerAction resetConsumerAction =
- new ResetConsumerAction(
- warehouse, database, tableName,
Collections.emptyMap(), "myid", 1);
- resetConsumerAction.run();
-
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ new ResetConsumerAction(
+ warehouse, database, tableName,
Collections.emptyMap(), "myid", 1)
+ .run();
+ } else {
+ callProcedure(
+ String.format("CALL reset_consumer('%s.%s', 'myid', 1)",
database, tableName));
+ }
Optional<Consumer> consumer2 = consumerManager.consumer("myid");
assertThat(consumer2).isPresent();
assertThat(consumer2.get().nextSnapshot()).isEqualTo(1);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index e477adfed..ca22dfcf4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -37,6 +37,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
@@ -54,13 +55,19 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
public void testDropPartitionWithSinglePartitionKey(boolean hasPk) throws
Exception {
FileStoreTable table = prepareTable(hasPk);
- new DropPartitionAction(
- warehouse,
- database,
- tableName,
-
Collections.singletonList(Collections.singletonMap("partKey0", "0")),
- Collections.emptyMap())
- .run();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ new DropPartitionAction(
+ warehouse,
+ database,
+ tableName,
+
Collections.singletonList(Collections.singletonMap("partKey0", "0")),
+ Collections.emptyMap())
+ .run();
+ } else {
+ callProcedure(
+ String.format(
+ "CALL drop_partition('%s.%s', 'partKey0 = 0')",
database, tableName));
+ }
SnapshotManager snapshotManager =
getFileStoreTable(tableName).snapshotManager();
Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
@@ -106,13 +113,20 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
partitions1.put("partKey0", "1");
partitions1.put("partKey1", "0");
- new DropPartitionAction(
- warehouse,
- database,
- tableName,
- Arrays.asList(partitions0, partitions1),
- Collections.emptyMap())
- .run();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ new DropPartitionAction(
+ warehouse,
+ database,
+ tableName,
+ Arrays.asList(partitions0, partitions1),
+ Collections.emptyMap())
+ .run();
+ } else {
+ callProcedure(
+ String.format(
+ "CALL drop_partition('%s.%s',
'partKey0=0,partKey1=1', 'partKey0=1,partKey1=0')",
+ database, tableName));
+ }
SnapshotManager snapshotManager =
getFileStoreTable(tableName).snapshotManager();
Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
index d562f5160..c3ef32450 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java
@@ -36,9 +36,15 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.flink.table.planner.factories.TestValuesTableFactory.changelogRow;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_DELETE;
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.MATCHED_UPSERT;
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_DELETE;
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_BY_SOURCE_UPSERT;
+import static
org.apache.paimon.flink.action.MergeIntoActionFactory.NOT_MATCHED_INSERT;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildDdl;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.buildSimpleQuery;
@@ -110,8 +116,31 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
"dt < '02-28'", "v = v || '_nmu', last_action =
'not_matched_upsert'")
.withNotMatchedBySourceDelete("dt >= '02-28'");
+ String mergeActions =
+ String.format(
+ "%s,%s,%s,%s,%s",
+ MATCHED_UPSERT,
+ MATCHED_DELETE,
+ NOT_MATCHED_INSERT,
+ NOT_MATCHED_BY_SOURCE_UPSERT,
+ NOT_MATCHED_BY_SOURCE_DELETE);
+ String procedureStatement =
+ String.format(
+ "CALL merge_into('%s.T', '', '', 'default.S', 'T.k =
S.k AND T.dt = S.dt', '%s', %s)",
+ database,
+ mergeActions,
+ "'T.v <> S.v AND S.v IS NOT NULL', "
+ + "'v = S.v, last_action =
''matched_upsert''', "
+ + "'S.v IS NULL', "
+ + "'', "
+ + "'S.k, S.v, ''insert'', S.dt', "
+ + "'dt < ''02-28''', "
+ + "'v = v || ''_nmu'', last_action =
''not_matched_upsert''', "
+ + "'dt >= ''02-28'''");
+
validateActionRunResult(
action,
+ procedureStatement,
expected,
Arrays.asList(
changelogRow("+I", 1, "v_1", "creation", "02-27"),
@@ -169,8 +198,14 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
.withMergeCondition("TT.k = S.k AND TT.dt = S.dt")
.withMatchedDelete("S.v IS NULL");
+ String procedureStatement =
+ String.format(
+ "CALL merge_into('%s.T', 'TT', '', 'S', 'TT.k = S.k
AND TT.dt = S.dt', '%s', %s)",
+ inDefault ? database : "test_db", MATCHED_DELETE,
"'S.v IS NULL'");
+
validateActionRunResult(
action,
+ procedureStatement,
Arrays.asList(
changelogRow("-D", 4, "v_4", "creation", "02-27"),
changelogRow("-D", 8, "v_8", "creation", "02-28")),
@@ -205,6 +240,11 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
.withMergeCondition("T.k = S.k AND T.dt = S.dt")
.withMatchedDelete("S.v IS NULL");
+ String procedureStatement =
+ String.format(
+ "CALL merge_into('default.T', '', '', '%s', 'T.k = S.k
AND T.dt = S.dt', '%s', %s)",
+ sourceTableName, MATCHED_DELETE, "'S.v IS NULL'");
+
if (!inDefault) {
sEnv.executeSql("USE `default`");
bEnv.executeSql("USE `default`");
@@ -212,6 +252,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
validateActionRunResult(
action,
+ procedureStatement,
Arrays.asList(
changelogRow("-D", 4, "v_4", "creation", "02-27"),
changelogRow("-D", 8, "v_8", "creation", "02-28")),
@@ -236,6 +277,7 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
String.format(
"CREATE CATALOG test_cat WITH ('type' = 'paimon',
'warehouse' = '%s')",
getTempDirPath());
+ String escapeCatalog = catalog.replaceAll("'", "''");
String id =
TestValuesTableFactory.registerData(
Arrays.asList(
@@ -245,8 +287,9 @@ public class MergeIntoActionITCase extends ActionITCaseBase
{
String ddl =
String.format(
"CREATE TEMPORARY TABLE %s (k INT, v STRING, dt
STRING)\n"
- + "WITH ('connector' = 'values', 'bounded' =
'true', 'data-id' = '%s');",
+ + "WITH ('connector' = 'values', 'bounded' =
'true', 'data-id' = '%s')",
useCatalog ? "S" : "test_cat.`default`.S", id);
+ String escapeDdl = ddl.replaceAll("'", "''");
MergeIntoAction action = new MergeIntoAction(warehouse, database, "T");
@@ -261,8 +304,22 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
action.withMergeCondition("T.k = S.k AND T.dt =
S.dt").withMatchedDelete("S.v IS NULL");
+ String procedureStatement =
+ String.format(
+ "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = S.k
AND T.dt = S.dt', '%s', %s)",
+ database,
+ useCatalog
+ ? String.format(
+ "%s;%s;%s",
+ escapeCatalog, "USE CATALOG test_cat",
escapeDdl)
+ : String.format("%s;%s", escapeCatalog,
escapeDdl),
+ useCatalog ? "S" : "test_cat.default.S",
+ MATCHED_DELETE,
+ "'S.v IS NULL'");
+
validateActionRunResult(
action,
+ procedureStatement,
Arrays.asList(
changelogRow("-D", 4, "v_4", "creation", "02-27"),
changelogRow("-D", 8, "v_8", "creation", "02-28")),
@@ -287,8 +344,18 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
.withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
.withMatchedUpsert(null, "*");
+ String procedureStatement =
+ String.format(
+ "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k
AND T.dt = SS.dt', '%s', %s)",
+ database,
+ "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'',
dt FROM S",
+ qualified ? "default.SS" : "SS",
+ MATCHED_UPSERT,
+ "'', '*'");
+
validateActionRunResult(
action,
+ procedureStatement,
Arrays.asList(
changelogRow("-U", 1, "v_1", "creation", "02-27"),
changelogRow("+U", 1, "v_1", "unknown", "02-27"),
@@ -321,8 +388,18 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
.withMergeCondition("T.k = SS.k AND T.dt = SS.dt")
.withNotMatchedInsert("SS.k < 12", "*");
+ String procedureStatement =
+ String.format(
+ "CALL merge_into('%s.T', '', '%s', '%s', 'T.k = SS.k
AND T.dt = SS.dt', '%s', %s)",
+ database,
+ "CREATE TEMPORARY VIEW SS AS SELECT k, v, ''unknown'',
dt FROM S",
+ qualified ? "default.SS" : "SS",
+ NOT_MATCHED_INSERT,
+ "'SS.k < 12', '*'");
+
validateActionRunResult(
action,
+ procedureStatement,
Arrays.asList(
changelogRow("+I", 8, "v_8", "unknown", "02-29"),
changelogRow("+I", 11, "v_11", "unknown", "02-29")),
@@ -416,11 +493,18 @@ public class MergeIntoActionITCase extends
ActionITCaseBase {
}
private void validateActionRunResult(
- MergeIntoAction action, List<Row> streamingExpected, List<Row>
batchExpected)
+ MergeIntoAction action,
+ String procedureStatement,
+ List<Row> streamingExpected,
+ List<Row> batchExpected)
throws Exception {
BlockingIterator<Row, Row> iterator =
testStreamingRead(buildSimpleQuery("T"), initialRecords);
- action.run();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ action.run();
+ } else {
+ callProcedure(procedureStatement);
+ }
// test streaming read
validateStreamingReadResult(iterator, streamingExpected);
iterator.close();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index 97c8637a5..e6f6cea6b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
@@ -65,9 +66,11 @@ public class RollbackToActionITCase extends ActionITCaseBase
{
writeData(rowData(2L, BinaryString.fromString("World")));
writeData(rowData(2L, BinaryString.fromString("Flink")));
- RollbackToAction action =
- new RollbackToAction(warehouse, database, tableName, "2",
Collections.emptyMap());
- action.run();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ new RollbackToAction(warehouse, database, tableName, "2",
Collections.emptyMap()).run();
+ } else {
+ callProcedure(String.format("CALL rollback_to('%s.%s', 2)",
database, tableName));
+ }
testBatchRead(
"SELECT * FROM `" + tableName + "`",
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
index d0f3e2879..4f44d672f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/TagActionITCase.java
@@ -31,6 +31,7 @@ import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Collections;
+import java.util.concurrent.ThreadLocalRandom;
import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.init;
import static
org.apache.paimon.flink.util.ReadWriteTableTestUtil.testBatchRead;
@@ -65,10 +66,13 @@ public class TagActionITCase extends ActionITCaseBase {
TagManager tagManager = new TagManager(table.fileIO(),
table.location());
- CreateTagAction createTagAction =
- new CreateTagAction(
- warehouse, database, tableName,
Collections.emptyMap(), "tag2", 2);
- createTagAction.run();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ new CreateTagAction(warehouse, database, tableName,
Collections.emptyMap(), "tag2", 2)
+ .run();
+ } else {
+ callProcedure(
+ String.format("CALL create_tag('%s.%s', 'tag2', 2)",
database, tableName));
+ }
assertThat(tagManager.tagExists("tag2")).isTrue();
// read tag2
@@ -76,9 +80,12 @@ public class TagActionITCase extends ActionITCaseBase {
"SELECT * FROM `" + tableName + "` /*+
OPTIONS('scan.tag-name'='tag2') */",
Arrays.asList(Row.of(1L, "Hi"), Row.of(2L, "Hello")));
- DeleteTagAction deleteTagAction =
- new DeleteTagAction(warehouse, database, tableName,
Collections.emptyMap(), "tag2");
- deleteTagAction.run();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ new DeleteTagAction(warehouse, database, tableName,
Collections.emptyMap(), "tag2")
+ .run();
+ } else {
+ callProcedure(String.format("CALL delete_tag('%s.%s', 'tag2')",
database, tableName));
+ }
assertThat(tagManager.tagExists("tag2")).isFalse();
}
}