This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 399883c99 [flink] Support multiple tables in dedicated compaction job
(#1852)
399883c99 is described below
commit 399883c99fc40f52aeadd28b617427e0639a8b6b
Author: JunZhang <[email protected]>
AuthorDate: Mon Aug 21 17:53:16 2023 +0800
[flink] Support multiple tables in dedicated compaction job (#1852)
---
docs/content/maintenance/multiple-writers.md | 39 +-
.../paimon/flink/action/CompactDatabaseAction.java | 176 +++++++
.../flink/action/CompactDatabaseActionFactory.java | 86 +++
.../services/org.apache.paimon.factories.Factory | 1 +
.../paimon/flink/action/CompactActionITCase.java | 54 +-
.../flink/action/CompactActionITCaseBase.java | 86 +++
.../flink/action/CompactDatabaseActionITCase.java | 574 +++++++++++++++++++++
7 files changed, 964 insertions(+), 52 deletions(-)
diff --git a/docs/content/maintenance/multiple-writers.md
b/docs/content/maintenance/multiple-writers.md
index 17ede6059..2de3a7f97 100644
--- a/docs/content/maintenance/multiple-writers.md
+++ b/docs/content/maintenance/multiple-writers.md
@@ -95,6 +95,23 @@ Run the following command to submit a compaction job for the
table.
[--partition <partition-name>] \
[--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]] \
```
+
+Or run the following command to submit a compaction job for multiple database.
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ compact-database \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ [--including-tables <paimon-table-name|name-regular-expr>] \
+ [--excluding-tables <paimon-table-name|name-regular-expr>] \
+ [--catalog-conf <paimon-catalog-conf> [--catalog-conf
<paimon-catalog-conf> ...]]
+```
+
+* `--database` is used to specify which database is to be compacted. In
compact mode, you need to specify a database name, in compact-database mode,
you could specify multiple database, regular expression is supported.
+* `--including-tables` is used to specify which source tables are to be
compacted, you must use '|' to separate multiple tables, the format is
`databaseName.tableName`, regular expression is supported. For example,
specifying "--including-tables db1.t1|db2.+" means to compact table 'db1.t1'
and all tables in the db2 database.
+* `--excluding-tables` is used to specify which source tables are not to be
compacted. The usage is same as "--including-tables". "--excluding-tables" has
higher priority than "--including-tables" if you specified both.
* `--catalog-conf` is the configuration for Paimon catalog. Each configuration
should be specified in the format `key=value`. See [here]({{< ref
"maintenance/configurations" >}}) for a complete list of catalog configurations.
If you submit a batch job (set `execution.runtime-mode: batch` in Flink's
configuration), all current table files will be compacted. If you submit a
streaming job (set `execution.runtime-mode: streaming` in Flink's
configuration), the job will continuously monitor new changes to the table and
perform compactions as needed.
@@ -105,7 +122,7 @@ If you only want to submit the compaction job and don't
want to wait until the j
{{< /hint >}}
-Example
+Example1: compact table
```bash
<FLINK_HOME>/bin/flink run \
@@ -121,6 +138,19 @@ Example
--catalog-conf s3.secret-key=*****
```
+Example2: compact database
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ compact-database \
+ --warehouse s3:///path/to/warehouse \
+ --database test_db \
+ --catalog-conf s3.endpoint=https://****.com \
+ --catalog-conf s3.access-key=***** \
+ --catalog-conf s3.secret-key=*****
+```
+
For more usage of the compact action, see
```bash
@@ -128,6 +158,13 @@ For more usage of the compact action, see
/path/to/paimon-flink-action-{{< version >}}.jar \
compact --help
```
+or
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ compact-database --help
+```
{{< /tab >}}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
new file mode 100644
index 000000000..1be9e7b15
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
+import org.apache.paimon.flink.sink.CompactorSinkBuilder;
+import org.apache.paimon.flink.source.CompactorSourceBuilder;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.data.RowData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Database compact action for Flink. */
+public class CompactDatabaseAction extends ActionBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactDatabaseAction.class);
+
+ private final Pattern includingPattern;
+ private final Pattern excludingPattern;
+ private final String database;
+ private final Map<String, Table> tableMap = new HashMap<>();
+
+ public CompactDatabaseAction(
+ String warehouse,
+ String database,
+ @Nullable String includingTables,
+ @Nullable String excludingTables,
+ Map<String, String> catalogConfig) {
+ super(warehouse, catalogConfig);
+ this.database = database;
+ this.includingPattern = Pattern.compile(includingTables == null ? ".*"
: includingTables);
+ this.excludingPattern = excludingTables == null ? null :
Pattern.compile(excludingTables);
+ }
+
+ private boolean shouldCompactionTable(String paimonFullTableName) {
+ boolean shouldCompaction =
includingPattern.matcher(paimonFullTableName).matches();
+ if (excludingPattern != null) {
+ shouldCompaction =
+ shouldCompaction &&
!excludingPattern.matcher(paimonFullTableName).matches();
+ }
+ if (!shouldCompaction) {
+ LOG.debug("Source table '{}' is excluded.", paimonFullTableName);
+ }
+ return shouldCompaction;
+ }
+
+ public void build(StreamExecutionEnvironment env) {
+ try {
+ Pattern databasePattern = Pattern.compile(database);
+ List<String> databases = catalog.listDatabases();
+ for (String databaseName : databases) {
+ Matcher databaseMatcher =
databasePattern.matcher(databaseName);
+ if (databaseMatcher.matches()) {
+ List<String> tables = catalog.listTables(databaseName);
+ for (String tableName : tables) {
+ String fullTableName = String.format("%s.%s",
databaseName, tableName);
+ if (shouldCompactionTable(fullTableName)) {
+ Table table =
+
catalog.getTable(Identifier.create(databaseName, tableName));
+ if (!(table instanceof FileStoreTable)) {
+ LOG.error(
+ String.format(
+ "Only FileStoreTable supports
compact action. The table type is '%s'.",
+ table.getClass().getName()));
+ continue;
+ }
+ table =
+ table.copy(
+ Collections.singletonMap(
+
CoreOptions.WRITE_ONLY.key(), "false"));
+ tableMap.put(fullTableName, table);
+ } else {
+ LOG.debug("The table {} is excluded.",
fullTableName);
+ }
+ }
+ }
+ }
+ } catch (Catalog.DatabaseNotExistException |
Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
+
+ ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ boolean isStreaming =
+ conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING;
+ for (Map.Entry<String, Table> entry : tableMap.entrySet()) {
+ FileStoreTable fileStoreTable = (FileStoreTable) entry.getValue();
+ switch (fileStoreTable.bucketMode()) {
+ case UNAWARE:
+ {
+ buildForUnawareBucketCompaction(
+ env,
+ entry.getKey(),
+ (AppendOnlyFileStoreTable) entry.getValue(),
+ isStreaming);
+ break;
+ }
+ case FIXED:
+ case DYNAMIC:
+ default:
+ {
+ buildForTraditionalCompaction(
+ env, entry.getKey(), fileStoreTable,
isStreaming);
+ }
+ }
+ }
+ }
+
+ private void buildForTraditionalCompaction(
+ StreamExecutionEnvironment env,
+ String fullName,
+ FileStoreTable table,
+ boolean isStreaming) {
+
+ CompactorSourceBuilder sourceBuilder = new
CompactorSourceBuilder(fullName, table);
+ CompactorSinkBuilder sinkBuilder = new CompactorSinkBuilder(table);
+
+ DataStreamSource<RowData> source =
+
sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build();
+ sinkBuilder.withInput(source).build();
+ }
+
+ private void buildForUnawareBucketCompaction(
+ StreamExecutionEnvironment env,
+ String fullName,
+ AppendOnlyFileStoreTable table,
+ boolean isStreaming) {
+ UnawareBucketCompactionTopoBuilder unawareBucketCompactionTopoBuilder =
+ new UnawareBucketCompactionTopoBuilder(env, fullName, table);
+
+ unawareBucketCompactionTopoBuilder.withContinuousMode(isStreaming);
+ unawareBucketCompactionTopoBuilder.build();
+ }
+
+ @Override
+ public void run() throws Exception {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ build(env);
+ execute(env, "Compact database job");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
new file mode 100644
index 000000000..345733616
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseActionFactory.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.flink.api.java.utils.MultipleParameterTool;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create {@link CompactDatabaseAction}. */
+public class CompactDatabaseActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "compact-database";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterTool params) {
+ String warehouse = params.get("warehouse");
+ String database = params.get("database");
+ String includingTables = params.get("including-tables");
+ String excludingTables = params.get("excluding-tables");
+ Map<String, String> catalogConfig = optionalConfigMap(params,
"catalog-conf");
+
+ CompactDatabaseAction action =
+ new CompactDatabaseAction(
+ warehouse, database, includingTables, excludingTables,
catalogConfig);
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"compact-database\" runs a dedicated job for
compacting one or multiple database.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " compact-database --warehouse <warehouse-path> --database
<database-name> "
+ + "[--including-tables
<paimon-table-name|name-regular-expr>] "
+ + "[--excluding-tables
<paimon-table-name|name-regular-expr>] ");
+ System.out.println(
+ " compact-database --warehouse s3://path/to/warehouse
--database <database-name> "
+ + "[--catalog-conf <paimon-catalog-conf>
[--catalog-conf <paimon-catalog-conf> ...]]");
+ System.out.println();
+
+ System.out.println(
+ "--including-tables is used to specify which source tables are
to be compacted. "
+ + "You must use '|' to separate multiple tables, the
format is `databaseName.tableName`, Regular expression is supported.");
+ System.out.println(
+ "--excluding-tables is used to specify which source tables are
not to be compacted. "
+ + "The usage is same as --including-tables.");
+ System.out.println(
+ "--excluding-tables has higher priority than
--including-tables if you specified both.");
+ System.out.println();
+
+ System.out.println("Examples:");
+ System.out.println(
+ " compact-database --warehouse hdfs:///path/to/warehouse
--database test_db");
+ System.out.println(
+ " compact-database --warehouse s3:///path/to/warehouse "
+ + "--database test_db "
+ + "--catalog-conf s3.endpoint=https://****.com "
+ + "--catalog-conf s3.access-key=***** "
+ + "--catalog-conf s3.secret-key=***** ");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 2e67176dc..c77a3ed38 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -17,6 +17,7 @@ org.apache.paimon.flink.kafka.KafkaLogStoreFactory
### action factories
org.apache.paimon.flink.action.CompactActionFactory
+org.apache.paimon.flink.action.CompactDatabaseActionFactory
org.apache.paimon.flink.action.DropPartitionActionFactory
org.apache.paimon.flink.action.DeleteActionFactory
org.apache.paimon.flink.action.MergeIntoActionFactory
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 4fa96df1d..d2e7e5c17 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -22,8 +22,6 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -43,20 +41,17 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import java.time.Duration;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeoutException;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for {@link CompactAction}. */
-public class CompactActionITCase extends ActionITCaseBase {
+public class CompactActionITCase extends CompactActionITCaseBase {
private static final DataType[] FIELD_TYPES =
new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING()};
@@ -174,6 +169,7 @@ public class CompactActionITCase extends ActionITCaseBase {
// first full compaction
validateResult(
table,
+ ROW_TYPE,
scan,
Arrays.asList("+I[1, 100, 15, 20221208]", "+I[1, 100, 15,
20221209]"),
60_000);
@@ -187,6 +183,7 @@ public class CompactActionITCase extends ActionITCaseBase {
// second full compaction
validateResult(
table,
+ ROW_TYPE,
scan,
Arrays.asList(
"+U[1, 101, 15, 20221208]",
@@ -317,49 +314,4 @@ public class CompactActionITCase extends ActionITCaseBase {
return Arrays.asList(partition1, partition2);
}
-
- private void validateResult(
- FileStoreTable table, StreamTableScan scan, List<String> expected,
long timeout)
- throws Exception {
- List<String> actual = new ArrayList<>();
- long start = System.currentTimeMillis();
- while (actual.size() != expected.size()) {
- TableScan.Plan plan = scan.plan();
- actual.addAll(getResult(table.newReadBuilder().newRead(),
plan.splits(), ROW_TYPE));
-
- if (System.currentTimeMillis() - start > timeout) {
- break;
- }
- }
- if (actual.size() != expected.size()) {
- throw new TimeoutException(
- String.format(
- "Cannot collect %s records in %s milliseconds.",
- expected.size(), timeout));
- }
- actual.sort(String::compareTo);
- assertThat(actual).isEqualTo(expected);
- }
-
- private void checkFileAndRowSize(
- FileStoreScan scan, Long expectedSnapshotId, Long timeout, int
fileNum, long rowCount)
- throws Exception {
-
- long start = System.currentTimeMillis();
- while (!Objects.equals(snapshotManager.latestSnapshotId(),
expectedSnapshotId)) {
- Thread.sleep(500);
- if (System.currentTimeMillis() - start > timeout) {
- throw new RuntimeException("can't wait for a compaction.");
- }
- }
-
- List<ManifestEntry> files =
-
scan.withSnapshot(snapshotManager.latestSnapshotId()).plan().files(FileKind.ADD);
- long count = 0;
- for (ManifestEntry file : files) {
- count += file.file().rowCount();
- }
- assertThat(files.size()).isEqualTo(fileNum);
- assertThat(count).isEqualTo(rowCount);
- }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
new file mode 100644
index 000000000..0f2ada0a9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeoutException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base IT cases for {@link CompactAction} and {@link CompactDatabaseAction}
. */
+public class CompactActionITCaseBase extends ActionITCaseBase {
+ protected void validateResult(
+ FileStoreTable table,
+ RowType rowType,
+ StreamTableScan scan,
+ List<String> expected,
+ long timeout)
+ throws Exception {
+ List<String> actual = new ArrayList<>();
+ long start = System.currentTimeMillis();
+ while (actual.size() != expected.size()) {
+ TableScan.Plan plan = scan.plan();
+ actual.addAll(getResult(table.newReadBuilder().newRead(),
plan.splits(), rowType));
+
+ if (System.currentTimeMillis() - start > timeout) {
+ break;
+ }
+ }
+ if (actual.size() != expected.size()) {
+ throw new TimeoutException(
+ String.format(
+ "Cannot collect %s records in %s milliseconds.",
+ expected.size(), timeout));
+ }
+ actual.sort(String::compareTo);
+ assertThat(actual).isEqualTo(expected);
+ }
+
+ protected void checkFileAndRowSize(
+ FileStoreScan scan, Long expectedSnapshotId, Long timeout, int
fileNum, long rowCount)
+ throws Exception {
+
+ long start = System.currentTimeMillis();
+ while (!Objects.equals(snapshotManager.latestSnapshotId(),
expectedSnapshotId)) {
+ Thread.sleep(500);
+ if (System.currentTimeMillis() - start > timeout) {
+ throw new RuntimeException("can't wait for a compaction.");
+ }
+ }
+
+ List<ManifestEntry> files =
+
scan.withSnapshot(snapshotManager.latestSnapshotId()).plan().files(FileKind.ADD);
+ long count = 0;
+ for (ManifestEntry file : files) {
+ count += file.file().rowCount();
+ }
+ assertThat(files.size()).isEqualTo(fileNum);
+ assertThat(count).isEqualTo(rowCount);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
new file mode 100644
index 000000000..f0b83d989
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -0,0 +1,574 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.StreamWriteBuilder;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CommonTestUtils;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link CompactDatabaseAction}. */
+public class CompactDatabaseActionITCase extends CompactActionITCaseBase {
+
+ private static final String[] DATABASE_NAMES = new String[] {"db1", "db2"};
+ private static final String[] TABLE_NAMES = new String[] {"t1", "t2"};
+ private static final Map<String, RowType> ROW_TYPE_MAP = new
HashMap<>(TABLE_NAMES.length);
+
+ @BeforeAll
+ public static void beforeAll() {
+ // set different datatype and RowType
+ DataType[] dataTypes1 =
+ new DataType[] {
+ DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING()
+ };
+ DataType[] dataTypes2 =
+ new DataType[] {
+ DataTypes.INT(), DataTypes.BIGINT(), DataTypes.INT(),
DataTypes.STRING()
+ };
+
+ ROW_TYPE_MAP.put("t1", RowType.of(dataTypes1, new String[] {"k", "v",
"hh", "dt"}));
+ ROW_TYPE_MAP.put("t2", RowType.of(dataTypes2, new String[] {"k", "v1",
"hh", "dt"}));
+ }
+
+ private FileStoreTable createTable(
+ String databaseName,
+ String tableName,
+ RowType rowType,
+ List<String> partitionKeys,
+ List<String> primaryKeys,
+ Map<String, String> options)
+ throws Exception {
+
+ Catalog catalog = catalog();
+ Identifier identifier = Identifier.create(databaseName, tableName);
+ catalog.createDatabase(databaseName, true);
+ catalog.createTable(
+ identifier,
+ new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options, ""),
+ false);
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
+
+ @Test
+ @Timeout(60)
+ public void testBatchCompact() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+ List<FileStoreTable> tables = new ArrayList<>();
+
+ for (String dbName : DATABASE_NAMES) {
+ for (String tableName : TABLE_NAMES) {
+ FileStoreTable table =
+ createTable(
+ dbName,
+ tableName,
+ ROW_TYPE_MAP.get(tableName),
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ options);
+ tables.add(table);
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ Object value = null;
+ if (tableName.equals("t1")) {
+ value = 100;
+ } else if (tableName.equals("t2")) {
+ value = 100L;
+ }
+
+ writeData(
+ rowData(1, value, 15,
BinaryString.fromString("20221208")),
+ rowData(1, value, 16,
BinaryString.fromString("20221208")),
+ rowData(1, value, 15,
BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(2, value, 15,
BinaryString.fromString("20221208")),
+ rowData(2, value, 16,
BinaryString.fromString("20221208")),
+ rowData(2, value, 15,
BinaryString.fromString("20221209")));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(2);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ }
+ }
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+ new CompactDatabaseAction(warehouse, "db1|db2", null, null, new
HashMap<>()).build(env);
+ env.execute();
+
+ for (FileStoreTable table : tables) {
+ Snapshot snapshot =
+
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(3);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+
+ List<DataSplit> splits =
table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.size()).isEqualTo(3);
+ for (DataSplit split : splits) {
+ assertThat(split.dataFiles().size()).isEqualTo(1);
+ }
+ }
+ }
+
+ @Test
+ @Timeout(60)
+ public void includeTableCompaction() throws Exception {
+ includingAndExcludingTablesImpl(
+ "db1.t1",
+ null,
+ Collections.singletonList(Identifier.fromString("db1.t1")),
+ Arrays.asList(
+ Identifier.fromString("db1.t2"),
+ Identifier.fromString("db2.t1"),
+ Identifier.fromString("db2.t2")));
+ }
+
+ @Test
+ @Timeout(60)
+ public void excludeTableCompaction() throws Exception {
+ includingAndExcludingTablesImpl(
+ null,
+ "db2.t2",
+ Arrays.asList(
+ Identifier.fromString("db1.t1"),
+ Identifier.fromString("db1.t2"),
+ Identifier.fromString("db2.t1")),
+ Collections.singletonList(Identifier.fromString("db2.t2")));
+ }
+
+ @Test
+ @Timeout(60)
+ public void includeAndExcludeTableCompaction() throws Exception {
+ includingAndExcludingTablesImpl(
+ "db1.+|db2.t1",
+ "db1.t2",
+ Arrays.asList(Identifier.fromString("db1.t1"),
Identifier.fromString("db2.t1")),
+ Arrays.asList(Identifier.fromString("db1.t2"),
Identifier.fromString("db2.t2")));
+ }
+
+ private void includingAndExcludingTablesImpl(
+ String includingPattern,
+ String excludesPattern,
+ List<Identifier> includeTables,
+ List<Identifier> excludeTables)
+ throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+ List<FileStoreTable> compactionTables = new ArrayList<>();
+ List<FileStoreTable> noCompactionTables = new ArrayList<>();
+
+ for (String dbName : DATABASE_NAMES) {
+ for (String tableName : TABLE_NAMES) {
+ FileStoreTable table =
+ createTable(
+ dbName,
+ tableName,
+ ROW_TYPE_MAP.get(tableName),
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ options);
+ if (includeTables.contains(Identifier.create(dbName,
tableName))) {
+ compactionTables.add(table);
+ } else if (excludeTables.contains(Identifier.create(dbName,
tableName))) {
+ noCompactionTables.add(table);
+ }
+
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ Object value = null;
+ if (tableName.equals("t1")) {
+ value = 100;
+ } else if (tableName.equals("t2")) {
+ value = 100L;
+ }
+
+ writeData(
+ rowData(1, value, 15,
BinaryString.fromString("20221208")),
+ rowData(1, value, 16,
BinaryString.fromString("20221208")),
+ rowData(1, value, 15,
BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(2, value, 15,
BinaryString.fromString("20221208")),
+ rowData(2, value, 16,
BinaryString.fromString("20221208")),
+ rowData(2, value, 15,
BinaryString.fromString("20221209")));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(2);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ }
+ }
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+ new CompactDatabaseAction(
+ warehouse, "db1|db2", includingPattern,
excludesPattern, new HashMap<>())
+ .build(env);
+ env.execute();
+
+ for (FileStoreTable table : compactionTables) {
+ snapshotManager = table.snapshotManager();
+ Snapshot snapshot =
+
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
+
+ assertThat(snapshot.id()).isEqualTo(3);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+
+ List<DataSplit> splits =
table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.size()).isEqualTo(3);
+ for (DataSplit split : splits) {
+ assertThat(split.dataFiles().size()).isEqualTo(1);
+ }
+ }
+
+ for (FileStoreTable table : noCompactionTables) {
+ snapshotManager = table.snapshotManager();
+ Snapshot snapshot =
+
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
+
+ assertThat(snapshot.id()).isEqualTo(2);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+ List<DataSplit> splits =
table.newSnapshotReader().read().dataSplits();
+ assertThat(splits.size()).isEqualTo(3);
+ for (DataSplit split : splits) {
+ assertThat(split.dataFiles().size()).isEqualTo(2);
+ }
+ }
+ }
+
+ @Test
+ public void testStreamingCompact() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
+ options.put(
+
FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(),
+ "1s");
+ options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ // test that dedicated compact job will expire snapshots
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+ options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
+
+ List<FileStoreTable> tables = new ArrayList<>();
+ for (String dbName : DATABASE_NAMES) {
+ for (String tableName : TABLE_NAMES) {
+ FileStoreTable table =
+ createTable(
+ dbName,
+ tableName,
+ ROW_TYPE_MAP.get(tableName),
+ Arrays.asList("dt", "hh"),
+ Arrays.asList("dt", "hh", "k"),
+ options);
+ tables.add(table);
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+
table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ Object value = null;
+ if (tableName.equals("t1")) {
+ value = 100;
+ } else if (tableName.equals("t2")) {
+ value = 100L;
+ }
+
+ // base records
+ writeData(
+ rowData(1, value, 15,
BinaryString.fromString("20221208")),
+ rowData(1, value, 16,
BinaryString.fromString("20221208")),
+ rowData(1, value, 15,
BinaryString.fromString("20221209")));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(1);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+
+ // no full compaction has happened, so plan should be empty
+ StreamTableScan scan = table.newReadBuilder().newStreamScan();
+ TableScan.Plan plan = scan.plan();
+ assertThat(plan.splits()).isEmpty();
+ }
+ }
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setCheckpointInterval(500);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+ new CompactDatabaseAction(warehouse, "db1|db2", null, null, new
HashMap<>()).build(env);
+ JobClient client = env.executeAsync();
+
+ for (FileStoreTable table : tables) {
+ StreamTableScan scan = table.newReadBuilder().newStreamScan();
+ // first full compaction
+ validateResult(
+ table,
+ ROW_TYPE_MAP.get(table.name()),
+ scan,
+ Arrays.asList(
+ "+I[1, 100, 15, 20221208]",
+ "+I[1, 100, 15, 20221209]",
+ "+I[1, 100, 16, 20221208]"),
+ 60_000);
+
+ Object value = null;
+ String tName = table.name();
+ if (tName.equals("t1")) {
+ value = 101;
+ } else if (tName.equals("t2")) {
+ value = 101L;
+ }
+
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ // incremental records
+ writeData(
+ rowData(1, value, 15, BinaryString.fromString("20221208")),
+ rowData(1, value, 16, BinaryString.fromString("20221208")),
+ rowData(1, value, 15,
BinaryString.fromString("20221209")));
+
+ // second full compaction
+ validateResult(
+ table,
+ ROW_TYPE_MAP.get(table.name()),
+ scan,
+ Arrays.asList(
+ "+U[1, 101, 15, 20221208]",
+ "+U[1, 101, 15, 20221209]",
+ "+U[1, 101, 16, 20221208]",
+ "-U[1, 100, 15, 20221208]",
+ "-U[1, 100, 15, 20221209]",
+ "-U[1, 100, 16, 20221208]"),
+ 60_000);
+
+ // assert dedicated compact job will expire snapshots
+ CommonTestUtils.waitUtil(
+ () ->
+ snapshotManager.latestSnapshotId() - 2
+ == snapshotManager.earliestSnapshotId(),
+ Duration.ofSeconds(60_000),
+ Duration.ofSeconds(100),
+ String.format(
+ "Cannot validate snapshot expiration in %s
milliseconds.", 60_000));
+ }
+
+ client.cancel();
+ }
+
+ @Test
+ public void testUnawareBucketStreamingCompact() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
+ // test that dedicated compact job will expire snapshots
+ options.put(CoreOptions.BUCKET.key(), "-1");
+ options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+ List<FileStoreTable> tables = new ArrayList<>();
+ for (String tableName : TABLE_NAMES) {
+ FileStoreTable table =
+ createTable(
+ database,
+ tableName,
+ ROW_TYPE_MAP.get(tableName),
+ Arrays.asList("k"),
+ Collections.emptyList(),
+ options);
+ tables.add(table);
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ Object value = null;
+ if (tableName.equals("t1")) {
+ value = 100;
+ } else if (tableName.equals("t2")) {
+ value = 100L;
+ }
+
+ // base records
+ writeData(
+ rowData(1, value, 15, BinaryString.fromString("20221208")),
+ rowData(1, value, 16, BinaryString.fromString("20221208")),
+ rowData(1, value, 15,
BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(1, value, 15, BinaryString.fromString("20221208")),
+ rowData(1, value, 16, BinaryString.fromString("20221208")),
+ rowData(1, value, 15,
BinaryString.fromString("20221209")));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(2);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ }
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setCheckpointInterval(500);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+ new CompactDatabaseAction(warehouse, database, null, null, new
HashMap<>()).build(env);
+ JobClient client = env.executeAsync();
+
+ for (FileStoreTable table : tables) {
+ FileStoreScan storeScan = table.store().newScan();
+
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ // first compaction, snapshot will be 3
+ checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6);
+
+ Object value = null;
+ String tName = table.name();
+ if (tName.equals("t1")) {
+ value = 101;
+ } else if (tName.equals("t2")) {
+ value = 101L;
+ }
+
+ writeData(
+ rowData(1, value, 15, BinaryString.fromString("20221208")),
+ rowData(1, value, 16, BinaryString.fromString("20221208")),
+ rowData(1, value, 15,
BinaryString.fromString("20221209")));
+
+ // second compaction, snapshot will be 5
+ checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9);
+ }
+
+ client.cancel().get();
+ }
+
+ @Test
+ public void testUnawareBucketBatchCompact() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ // test that dedicated compact job will expire snapshots
+ options.put(CoreOptions.BUCKET.key(), "-1");
+ options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+
+ List<FileStoreTable> tables = new ArrayList<>();
+ for (String tableName : TABLE_NAMES) {
+ FileStoreTable table =
+ createTable(
+ database,
+ tableName,
+ ROW_TYPE_MAP.get(tableName),
+ Collections.singletonList("k"),
+ Collections.emptyList(),
+ options);
+ tables.add(table);
+ snapshotManager = table.snapshotManager();
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ Object value = null;
+ if (tableName.equals("t1")) {
+ value = 100;
+ } else if (tableName.equals("t2")) {
+ value = 100L;
+ }
+
+ // base records
+ writeData(
+ rowData(1, value, 15, BinaryString.fromString("20221208")),
+ rowData(1, value, 16, BinaryString.fromString("20221208")),
+ rowData(1, value, 15,
BinaryString.fromString("20221209")));
+
+ writeData(
+ rowData(1, value, 15, BinaryString.fromString("20221208")),
+ rowData(1, value, 16, BinaryString.fromString("20221208")),
+ rowData(1, value, 15,
BinaryString.fromString("20221209")));
+
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(2);
+
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ }
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
+ new CompactDatabaseAction(warehouse, database, null, null, new
HashMap<>()).build(env);
+ env.execute();
+
+ for (FileStoreTable table : tables) {
+ FileStoreScan storeScan = table.store().newScan();
+ snapshotManager = table.snapshotManager();
+ // first compaction, snapshot will be 3.
+ checkFileAndRowSize(storeScan, 3L, 0L, 1, 6);
+ }
+ }
+}