This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f31cd18f2 [flink] Support compact procedure (#2013)
f31cd18f2 is described below
commit f31cd18f2a679c0b002a22b7222c00a256f93173
Author: yuzelin <[email protected]>
AuthorDate: Tue Sep 19 10:11:14 2023 +0800
[flink] Support compact procedure (#2013)
---
.github/workflows/e2e-tests-1.16-jdk11.yml | 4 +-
.../org/apache/paimon/catalog/AbstractCatalog.java | 6 +-
.../apache/paimon/catalog/FileSystemCatalog.java | 2 +-
paimon-flink/paimon-flink-1.18/pom.xml | 85 +++++++++
.../action/cdc/kafka/KafkaSyncDatabaseAction.java | 5 -
.../action/cdc/kafka/KafkaSyncTableAction.java | 5 -
.../cdc/mongodb/MongoDBSyncDatabaseAction.java | 5 -
.../action/cdc/mongodb/MongoDBSyncTableAction.java | 5 -
.../action/cdc/mysql/MySqlSyncDatabaseAction.java | 5 -
.../action/cdc/mysql/MySqlSyncTableAction.java | 5 -
.../flink/action/cdc/CdcActionITCaseBase.java | 21 ++-
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 3 +-
.../cdc/kafka/KafkaCanalSyncTableActionITCase.java | 3 +-
.../mongodb/MongoDBSyncDatabaseActionITCase.java | 3 +-
.../cdc/mongodb/MongoDBSyncTableActionITCase.java | 3 +-
.../cdc/mysql/MySqlSyncDatabaseActionITCase.java | 3 +-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 3 +-
.../java/org/apache/paimon/flink/FlinkCatalog.java | 2 +-
.../org/apache/paimon/flink/action/ActionBase.java | 11 +-
.../paimon/flink/action/CompactActionFactory.java | 23 +--
.../paimon/flink/action/SortCompactAction.java | 14 +-
.../paimon/flink/action/TableActionBase.java | 2 -
.../paimon/flink/procedure/CompactProcedure.java | 118 ++++++++++++
.../paimon/flink/procedure/ProcedureBase.java | 59 ++++++
.../paimon/flink/procedure/ProcedureUtil.java | 19 +-
.../paimon/flink/action/ActionITCaseBase.java | 65 ++++++-
.../paimon/flink/action/CompactActionITCase.java | 210 ++++++++++-----------
.../flink/action/CompactActionITCaseBase.java | 6 +-
.../flink/action/CompactDatabaseActionITCase.java | 47 ++---
.../paimon/flink/action/DeleteActionITCase.java | 4 +-
.../flink/action/DropPartitionActionITCase.java | 5 +-
.../flink/action/RollbackToActionITCase.java | 2 -
...ionITCase.java => SortCompactActionITCase.java} | 68 +++----
paimon-flink/pom.xml | 1 +
.../java/org/apache/paimon/hive/HiveCatalog.java | 2 +-
35 files changed, 542 insertions(+), 282 deletions(-)
diff --git a/.github/workflows/e2e-tests-1.16-jdk11.yml
b/.github/workflows/e2e-tests-1.16-jdk11.yml
index 9d83302bd..1ca1efdab 100644
--- a/.github/workflows/e2e-tests-1.16-jdk11.yml
+++ b/.github/workflows/e2e-tests-1.16-jdk11.yml
@@ -51,7 +51,7 @@ jobs:
. .github/workflows/utils.sh
jvm_timezone=$(random_timezone)
echo "JVM timezone is set to $jvm_timezone"
- mvn -T 1C -B clean install -DskipTests
- mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
+ mvn -T 1C -B clean install -DskipTests -Pflink-1.16
+ mvn -T 1C -B test -pl paimon-e2e-tests -Duser.timezone=$jvm_timezone
-Pflink-1.16
env:
MAVEN_OPTS: -Xmx4096m
\ No newline at end of file
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 6d212e908..29cd15747 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -157,7 +157,11 @@ public abstract class AbstractCatalog implements Catalog {
}
}
- protected abstract String warehouse();
+ public abstract String warehouse();
+
+ public Map<String, String> options() {
+ return catalogOptions;
+ }
protected abstract TableSchema getDataTableSchema(Identifier identifier)
throws TableNotExistException;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index e8468cc1b..a9dfb6e29 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -231,7 +231,7 @@ public class FileSystemCatalog extends AbstractCatalog {
public void close() throws Exception {}
@Override
- protected String warehouse() {
+ public String warehouse() {
return warehouse.toString();
}
}
diff --git a/paimon-flink/paimon-flink-1.18/pom.xml
b/paimon-flink/paimon-flink-1.18/pom.xml
new file mode 100644
index 000000000..2df59e0ea
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/pom.xml
@@ -0,0 +1,85 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink</artifactId>
+ <version>0.6-SNAPSHOT</version>
+ </parent>
+
+ <packaging>jar</packaging>
+
+ <artifactId>paimon-flink-1.18</artifactId>
+ <name>Paimon : Flink : 1.18</name>
+
+ <properties>
+ <flink.version>1.18-SNAPSHOT</flink.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-flink-cdc</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>shade-paimon</id>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes combine.children="append">
+
<include>org.apache.paimon:paimon-flink-common</include>
+
<include>org.apache.paimon:paimon-flink-cdc</include>
+ </includes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
index 155b0b5ea..3ad479a68 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncDatabaseAction.java
@@ -202,11 +202,6 @@ public class KafkaSyncDatabaseAction extends ActionBase {
return tableConfig;
}
- @VisibleForTesting
- public Map<String, String> catalogConfig() {
- return catalogConfig;
- }
-
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
index d8669eba3..0b362d4fb 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/kafka/KafkaSyncTableAction.java
@@ -203,11 +203,6 @@ public class KafkaSyncTableAction extends ActionBase {
return tableConfig;
}
- @VisibleForTesting
- public Map<String, String> catalogConfig() {
- return catalogConfig;
- }
-
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
index f6043824c..ea4d7a850 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseAction.java
@@ -184,11 +184,6 @@ public class MongoDBSyncDatabaseAction extends ActionBase {
return tableConfig;
}
- @VisibleForTesting
- public Map<String, String> catalogConfig() {
- return catalogConfig;
- }
-
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
index ed868a0ec..1c6b78ae3 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableAction.java
@@ -201,11 +201,6 @@ public class MongoDBSyncTableAction extends ActionBase {
return tableConfig;
}
- @VisibleForTesting
- public Map<String, String> catalogConfig() {
- return catalogConfig;
- }
-
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
index 8fc1ad16e..29c8261f8 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java
@@ -372,11 +372,6 @@ public class MySqlSyncDatabaseAction extends ActionBase {
return excludedTables;
}
- @VisibleForTesting
- public Map<String, String> catalogConfig() {
- return catalogConfig;
- }
-
@VisibleForTesting
public Map<String, String> tableConfig() {
return tableConfig;
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
index 3fca52255..acbec8eba 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableAction.java
@@ -280,11 +280,6 @@ public class MySqlSyncTableAction extends ActionBase {
return tableConfig;
}
- @VisibleForTesting
- public Map<String, String> catalogConfig() {
- return catalogConfig;
- }
-
// ------------------------------------------------------------------------
// Flink run methods
// ------------------------------------------------------------------------
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index b4f6dcf48..f4e1a4577 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.action.cdc;
-import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.table.FileStoreTable;
@@ -28,7 +27,11 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,9 +53,19 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
private static final Logger LOG =
LoggerFactory.getLogger(CdcActionITCaseBase.class);
- protected FileStoreTable getFileStoreTable(String tableName) throws
Exception {
- Identifier identifier = Identifier.create(database, tableName);
- return (FileStoreTable) catalog.getTable(identifier);
+ protected StreamExecutionEnvironment env;
+
+ @BeforeEach
+ public void setEnv() {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+ env.enableCheckpointing(1000);
+ env.setRestartStrategy(RestartStrategies.noRestart());
+ }
+
+ @AfterEach
+ public void closeEnv() throws Exception {
+ env.close();
}
protected void waitingTables(String... tables) throws Exception {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 30a50afe3..79c01d474 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -566,8 +566,7 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
.withTableConfig(Collections.singletonMap("table-key",
"table-value"))
.build();
- assertThat(action.catalogConfig())
-
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key",
"catalog-value"));
+ assertThat(action.catalogConfig()).containsEntry("catalog-key",
"catalog-value");
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
index 6b50274e9..d21dad37e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncTableActionITCase.java
@@ -953,8 +953,7 @@ public class KafkaCanalSyncTableActionITCase extends
KafkaActionITCaseBase {
.withTableConfig(Collections.singletonMap("table-key",
"table-value"))
.build();
- assertThat(action.catalogConfig())
-
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key",
"catalog-value"));
+ assertThat(action.catalogConfig()).containsEntry("catalog-key",
"catalog-value");
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
index 0d93200bd..2a06b4fc4 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.java
@@ -126,8 +126,7 @@ public class MongoDBSyncDatabaseActionITCase extends
MongoDBActionITCaseBase {
.withTableConfig(Collections.singletonMap("table-key",
"table-value"))
.build();
- assertThat(action.catalogConfig())
-
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key",
"catalog-value"));
+ assertThat(action.catalogConfig()).containsEntry("catalog-key",
"catalog-value");
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
index 75642b89b..bd4d78f04 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java
@@ -158,8 +158,7 @@ public class MongoDBSyncTableActionITCase extends
MongoDBActionITCaseBase {
.withTableConfig(Collections.singletonMap("table-key",
"table-value"))
.build();
- assertThat(action.catalogConfig())
-
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key",
"catalog-value"));
+ assertThat(action.catalogConfig()).containsEntry("catalog-key",
"catalog-value");
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
index d41bfe6e7..8990a73b7 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.java
@@ -1190,8 +1190,7 @@ public class MySqlSyncDatabaseActionITCase extends
MySqlActionITCaseBase {
.withTableConfig(Collections.singletonMap("table-key",
"table-value"))
.build();
- assertThat(action.catalogConfig())
-
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key",
"catalog-value"));
+ assertThat(action.catalogConfig()).containsEntry("catalog-key",
"catalog-value");
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 4b3c6ef13..d0e725c1e 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -851,8 +851,7 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
.withTableConfig(Collections.singletonMap("table-key",
"table-value"))
.build();
- assertThat(action.catalogConfig())
-
.containsExactlyEntriesOf(Collections.singletonMap("catalog-key",
"catalog-value"));
+ assertThat(action.catalogConfig()).containsEntry("catalog-key",
"catalog-value");
assertThat(action.tableConfig())
.containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 406969ea5..d15068935 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -891,7 +891,7 @@ public class FlinkCatalog extends AbstractCatalog {
*/
public Procedure getProcedure(ObjectPath procedurePath)
throws ProcedureNotExistException, CatalogException {
- return ProcedureUtil.getProcedure(procedurePath.getObjectName())
+ return ProcedureUtil.getProcedure(catalog,
procedurePath.getObjectName())
.orElseThrow(() -> new ProcedureNotExistException(name,
procedurePath));
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
index 6a4477f47..9c2687bd5 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.action;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
@@ -43,9 +44,7 @@ import java.util.stream.Collectors;
/** Abstract base of {@link Action} for table. */
public abstract class ActionBase implements Action {
- private final Options catalogOptions;
-
- protected final Map<String, String> catalogConfig;
+ protected final Options catalogOptions;
protected final Catalog catalog;
protected final FlinkCatalog flinkCatalog;
protected final String catalogName = "paimon-" + UUID.randomUUID();
@@ -53,7 +52,6 @@ public abstract class ActionBase implements Action {
protected final StreamTableEnvironment batchTEnv;
public ActionBase(String warehouse, Map<String, String> catalogConfig) {
- this.catalogConfig = catalogConfig;
catalogOptions = Options.fromMap(catalogConfig);
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);
@@ -110,4 +108,9 @@ public abstract class ActionBase implements Action {
return true;
}
+
+ @VisibleForTesting
+ public Map<String, String> catalogConfig() {
+ return catalogOptions.toMap();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
index b74bd8b2f..cb53c5db7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactActionFactory.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.action;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.MultipleParameterTool;
-import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -44,24 +43,10 @@ public class CompactActionFactory implements ActionFactory {
CompactAction action;
if (params.has("order-strategy")) {
- SortCompactAction sortCompactAction =
- new SortCompactAction(tablePath.f0, tablePath.f1,
tablePath.f2, catalogConfig);
-
- String strategy = params.get("order-strategy");
- sortCompactAction.withOrderStrategy(strategy);
-
- if (params.has("order-by")) {
- String sqlOrderBy = params.get("order-by");
- if (sqlOrderBy == null) {
- throw new IllegalArgumentException("Please specify
\"order-by\".");
- }
-
sortCompactAction.withOrderColumns(Arrays.asList(sqlOrderBy.split(",")));
- } else {
- throw new IllegalArgumentException(
- "Please specify order columns in parameter
--order-by.");
- }
-
- action = sortCompactAction;
+ action =
+ new SortCompactAction(tablePath.f0, tablePath.f1,
tablePath.f2, catalogConfig)
+ .withOrderStrategy(params.get("order-strategy"))
+ .withOrderColumns(getRequiredValue(params,
"order-by").split(","));
} else {
action = new CompactAction(tablePath.f0, tablePath.f1,
tablePath.f2, catalogConfig);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 16557a508..522aace00 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -38,10 +38,12 @@ import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -126,11 +128,17 @@ public class SortCompactAction extends CompactAction {
flinkSinkBuilder.build();
}
- public void withOrderStrategy(String sortStrategy) {
+ public SortCompactAction withOrderStrategy(String sortStrategy) {
this.sortStrategy = sortStrategy;
+ return this;
}
- public void withOrderColumns(List<String> orderColumns) {
- this.orderColumns = orderColumns;
+ public SortCompactAction withOrderColumns(String... orderColumns) {
+ return withOrderColumns(Arrays.asList(orderColumns));
+ }
+
+ public SortCompactAction withOrderColumns(List<String> orderColumns) {
+ this.orderColumns =
orderColumns.stream().map(String::trim).collect(Collectors.toList());
+ return this;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
index 3b8147a3e..02b8e1943 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java
@@ -56,8 +56,6 @@ public abstract class TableActionBase extends ActionBase {
try {
table = catalog.getTable(identifier);
} catch (Catalog.TableNotExistException e) {
- LOG.error("Table doesn't exist in given path.", e);
- System.err.println("Table doesn't exist in given path.");
throw new RuntimeException(e);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
new file mode 100644
index 000000000..cb8aa3162
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactProcedure.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.CompactAction;
+import org.apache.paimon.flink.action.SortCompactAction;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.paimon.flink.action.ActionFactory.parseCommaSeparatedKeyValues;
+
+/**
+ * Compact procedure. Usage:
+ *
+ * <pre><code>
+ * -- compact a table (tableId should be 'database_name.table_name')
+ * CALL compact('tableId')
+ *
+ * -- compact a table with sorting
+ * CALL compact('tableId', 'order-strategy', 'order-by-columns')
+ *
+ * -- compact specific partitions ('pt1=A,pt2=a', 'pt1=B,pt2=b', ...)
+ * -- NOTE: if you don't need sorting but you want specify partitions, use ''
as placeholder
+ * CALL compact('tableId', '', '', partition1, partition2, ...)
+ * </code></pre>
+ */
+public class CompactProcedure extends ProcedureBase {
+
+ public CompactProcedure(Catalog catalog) {
+ super(catalog);
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId)
throws Exception {
+ return call(procedureContext, tableId, "", "");
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String orderStrategy,
+ String orderByColumns)
+ throws Exception {
+ return call(procedureContext, tableId, orderStrategy, orderByColumns,
new String[0]);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String orderStrategy,
+ String orderByColumns,
+ String... partitionStrings)
+ throws Exception {
+ String warehouse = ((AbstractCatalog) catalog).warehouse();
+ Map<String, String> catalogOptions = ((AbstractCatalog)
catalog).options();
+ Identifier identifier = Identifier.fromString(tableId);
+ CompactAction action;
+ String jobName;
+ if (orderStrategy.isEmpty() && orderByColumns.isEmpty()) {
+ action =
+ new CompactAction(
+ warehouse,
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ catalogOptions);
+ jobName = "Compact Job";
+ } else if (!orderStrategy.isEmpty() && !orderByColumns.isEmpty()) {
+ action =
+ new SortCompactAction(
+ warehouse,
+ identifier.getDatabaseName(),
+ identifier.getObjectName(),
+ catalogOptions)
+ .withOrderStrategy(orderStrategy)
+ .withOrderColumns(orderByColumns.split(","));
+ jobName = "Sort Compact Job";
+ } else {
+ throw new IllegalArgumentException(
+ "You must specify 'order strategy' and 'order by columns'
both.");
+ }
+
+ if (partitionStrings.length != 0) {
+ List<Map<String, String>> partitions = new ArrayList<>();
+ for (String partition : partitionStrings) {
+ partitions.add(parseCommaSeparatedKeyValues(partition));
+ }
+ action.withPartitions(partitions);
+ }
+
+ StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
+ action.build(env);
+
+ return execute(env, jobName);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
new file mode 100644
index 000000000..c44fe2701
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.procedures.Procedure;
+
+import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+
+/** Base implementation for flink {@link Procedure}. */
+public class ProcedureBase implements Procedure {
+
+ protected final Catalog catalog;
+
+ ProcedureBase(Catalog catalog) {
+ this.catalog = catalog;
+ }
+
+ protected String[] execute(StreamExecutionEnvironment env, String
defaultJobName)
+ throws Exception {
+ ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ String name =
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
+ JobClient jobClient = env.executeAsync(name);
+ String jobId = jobClient.getJobID().toString();
+ if (conf.get(TABLE_DML_SYNC)) {
+ try {
+ jobClient.getJobExecutionResult().get();
+ } catch (Exception e) {
+ throw new TableException(String.format("Failed to wait job
'%s' finish", jobId), e);
+ }
+ return new String[] {"Success"};
+ } else {
+ return new String[] {"JobID=" + jobId};
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
index 8e9a02435..e90450afc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureUtil.java
@@ -18,13 +18,14 @@
package org.apache.paimon.flink.procedure;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.flink.action.CompactActionFactory;
+
import org.apache.flink.table.procedures.Procedure;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
/** Utility methods for {@link Procedure}. */
@@ -33,13 +34,21 @@ public class ProcedureUtil {
private ProcedureUtil() {}
private static final List<String> SYSTEM_PROCEDURES = new ArrayList<>();
- private static final Map<String, Procedure> SYSTEM_PROCEDURES_MAP = new
HashMap<>();
+
+ static {
+ SYSTEM_PROCEDURES.add(CompactActionFactory.IDENTIFIER);
+ }
public static List<String> listProcedures() {
return Collections.unmodifiableList(SYSTEM_PROCEDURES);
}
- public static Optional<Procedure> getProcedure(String procedureName) {
- return Optional.ofNullable(SYSTEM_PROCEDURES_MAP.get(procedureName));
+ public static Optional<Procedure> getProcedure(Catalog catalog, String
procedureName) {
+ switch (procedureName) {
+ case CompactActionFactory.IDENTIFIER:
+ return Optional.of(new CompactProcedure(catalog));
+ default:
+ return Optional.empty();
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index f3a1c5881..0e71d076f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -35,14 +35,21 @@ import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.SnapshotManager;
+import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -55,10 +62,8 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
protected String database;
protected String tableName;
protected String commitUser;
- protected SnapshotManager snapshotManager;
protected StreamTableWrite write;
protected StreamTableCommit commit;
- protected StreamExecutionEnvironment env;
protected Catalog catalog;
private long incrementalIdentifier;
@@ -69,11 +74,6 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
tableName = "test_table_" + UUID.randomUUID();
commitUser = UUID.randomUUID().toString();
incrementalIdentifier = 0;
- env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
- env.setParallelism(2);
- env.enableCheckpointing(1000);
- env.setRestartStrategy(RestartStrategies.noRestart());
catalog = CatalogFactory.createCatalog(CatalogContext.create(new
Path(warehouse)));
}
@@ -81,11 +81,12 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
public void after() throws Exception {
if (write != null) {
write.close();
+ write = null;
}
if (commit != null) {
commit.close();
+ commit = null;
}
- env.close();
catalog.close();
}
@@ -114,6 +115,11 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
return (FileStoreTable) catalog.getTable(identifier);
}
+ protected FileStoreTable getFileStoreTable(String tableName) throws
Exception {
+ Identifier identifier = Identifier.create(database, tableName);
+ return (FileStoreTable) catalog.getTable(identifier);
+ }
+
protected GenericRow rowData(Object... values) {
return GenericRow.of(values);
}
@@ -135,4 +141,45 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
return result;
}
}
+
+ protected StreamExecutionEnvironment buildDefaultEnv(boolean isStreaming) {
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+ env.setParallelism(2);
+
+ if (isStreaming) {
+ env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ env.getCheckpointConfig().setCheckpointInterval(500);
+ } else {
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ }
+
+ return env;
+ }
+
+ protected void callProcedure(String procedureStatement, boolean
isStreaming, boolean dmlSync) {
+ StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
+
+ TableEnvironment tEnv;
+ if (isStreaming) {
+ tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inStreamingMode());
+ tEnv.getConfig()
+ .set(
+
ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
+ Duration.ofMillis(500));
+ } else {
+ tEnv = StreamTableEnvironment.create(env,
EnvironmentSettings.inBatchMode());
+ }
+
+ tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, dmlSync);
+
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG PAIMON WITH ('type'='paimon',
'warehouse'='%s');",
+ warehouse));
+ tEnv.useCatalog("PAIMON");
+
+ tEnv.executeSql(procedureStatement);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index d2e7e5c17..a05b7c2f6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -21,8 +21,6 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
@@ -32,10 +30,8 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
+import org.apache.paimon.utils.SnapshotManager;
-import org.apache.flink.api.common.RuntimeExecutionMode;
-import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -62,20 +58,11 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
@Test
@Timeout(60)
public void testBatchCompact() throws Exception {
- Map<String, String> options = new HashMap<>();
- options.put(CoreOptions.WRITE_ONLY.key(), "true");
-
FileStoreTable table =
- createFileStoreTable(
- ROW_TYPE,
+ prepareTable(
Arrays.asList("dt", "hh"),
Arrays.asList("dt", "hh", "k"),
- options);
- snapshotManager = table.snapshotManager();
- StreamWriteBuilder streamWriteBuilder =
- table.newStreamWriteBuilder().withCommitUser(commitUser);
- write = streamWriteBuilder.newWrite();
- commit = streamWriteBuilder.newCommit();
+ Collections.singletonMap(CoreOptions.WRITE_ONLY.key(),
"true"));
writeData(
rowData(1, 100, 15, BinaryString.fromString("20221208")),
@@ -87,21 +74,15 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
rowData(2, 100, 16, BinaryString.fromString("20221208")),
rowData(2, 100, 15, BinaryString.fromString("20221209")));
- Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
- assertThat(snapshot.id()).isEqualTo(2);
-
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
- new CompactAction(warehouse, database, tableName)
- .withPartitions(getSpecifiedPartitions())
- .build(env);
- env.execute();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ runAction(false);
+ } else {
+ callProcedure(false);
+ }
- snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
- assertThat(snapshot.id()).isEqualTo(3);
-
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
+ checkLatestSnapshot(table, 3, Snapshot.CommitKind.COMPACT);
List<DataSplit> splits = table.newSnapshotReader().read().dataSplits();
assertThat(splits.size()).isEqualTo(3);
@@ -118,28 +99,18 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
@Test
public void testStreamingCompact() throws Exception {
- Map<String, String> options = new HashMap<>();
- options.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
- options.put(
-
FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(),
- "1s");
- options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
- options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.CHANGELOG_PRODUCER.key(),
"full-compaction");
+ tableOptions.put(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key(), "1");
+ tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
+ tableOptions.put(CoreOptions.WRITE_ONLY.key(), "true");
// test that dedicated compact job will expire snapshots
- options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
- options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
+ tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
+ tableOptions.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
FileStoreTable table =
- createFileStoreTable(
- ROW_TYPE,
- Arrays.asList("dt", "hh"),
- Arrays.asList("dt", "hh", "k"),
- options);
- snapshotManager = table.snapshotManager();
- StreamWriteBuilder streamWriteBuilder =
- table.newStreamWriteBuilder().withCommitUser(commitUser);
- write = streamWriteBuilder.newWrite();
- commit = streamWriteBuilder.newCommit();
+ prepareTable(
+ Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh",
"k"), tableOptions);
// base records
writeData(
@@ -147,24 +118,18 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
rowData(1, 100, 16, BinaryString.fromString("20221208")),
rowData(1, 100, 15, BinaryString.fromString("20221209")));
- Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
- assertThat(snapshot.id()).isEqualTo(1);
-
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ checkLatestSnapshot(table, 1, Snapshot.CommitKind.APPEND);
// no full compaction has happened, so plan should be empty
StreamTableScan scan = table.newReadBuilder().newStreamScan();
TableScan.Plan plan = scan.plan();
assertThat(plan.splits()).isEmpty();
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointInterval(500);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
- new CompactAction(warehouse, database, tableName)
- .withPartitions(getSpecifiedPartitions())
- .build(env);
- JobClient client = env.executeAsync();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ runAction(true);
+ } else {
+ callProcedure(true);
+ }
// first full compaction
validateResult(
@@ -193,6 +158,7 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
60_000);
// assert dedicated compact job will expire snapshots
+ SnapshotManager snapshotManager = table.snapshotManager();
CommonTestUtils.waitUtil(
() ->
snapshotManager.latestSnapshotId() - 2
@@ -200,27 +166,18 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
Duration.ofSeconds(60_000),
Duration.ofSeconds(100),
String.format("Cannot validate snapshot expiration in %s
milliseconds.", 60_000));
-
- client.cancel();
}
@Test
public void testUnawareBucketStreamingCompact() throws Exception {
- Map<String, String> options = new HashMap<>();
- options.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
- // test that dedicated compact job will expire snapshots
- options.put(CoreOptions.BUCKET.key(), "-1");
- options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
- options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(),
"1s");
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+ tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
FileStoreTable table =
- createFileStoreTable(
- ROW_TYPE, Collections.singletonList("k"),
Collections.emptyList(), options);
- snapshotManager = table.snapshotManager();
- StreamWriteBuilder streamWriteBuilder =
- table.newStreamWriteBuilder().withCommitUser(commitUser);
- write = streamWriteBuilder.newWrite();
- commit = streamWriteBuilder.newCommit();
+ prepareTable(Collections.singletonList("k"),
Collections.emptyList(), tableOptions);
// base records
writeData(
@@ -233,21 +190,16 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
rowData(1, 100, 16, BinaryString.fromString("20221208")),
rowData(1, 100, 15, BinaryString.fromString("20221209")));
- Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
- assertThat(snapshot.id()).isEqualTo(2);
-
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
-
- FileStoreScan storeScan = table.store().newScan();
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
-
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
- env.getCheckpointConfig().setCheckpointInterval(500);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
- new CompactAction(warehouse, database, tableName).build(env);
- JobClient client = env.executeAsync();
+ checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ runAction(true);
+ } else {
+ callProcedure(true);
+ }
// first compaction, snapshot will be 3
- checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6);
+ checkFileAndRowSize(table, 3L, 30_000L, 1, 6);
writeData(
rowData(1, 101, 15, BinaryString.fromString("20221208")),
@@ -255,27 +207,18 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
rowData(1, 101, 15, BinaryString.fromString("20221209")));
// second compaction, snapshot will be 5
- checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9);
-
- client.cancel().get();
+ checkFileAndRowSize(table, 5L, 30_000L, 1, 9);
}
@Test
public void testUnawareBucketBatchCompact() throws Exception {
- Map<String, String> options = new HashMap<>();
- // test that dedicated compact job will expire snapshots
- options.put(CoreOptions.BUCKET.key(), "-1");
- options.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
- options.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
+ Map<String, String> tableOptions = new HashMap<>();
+ tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+ tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+ tableOptions.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2");
FileStoreTable table =
- createFileStoreTable(
- ROW_TYPE, Collections.singletonList("k"),
Collections.emptyList(), options);
- snapshotManager = table.snapshotManager();
- StreamWriteBuilder streamWriteBuilder =
- table.newStreamWriteBuilder().withCommitUser(commitUser);
- write = streamWriteBuilder.newWrite();
- commit = streamWriteBuilder.newCommit();
+ prepareTable(Collections.singletonList("k"),
Collections.emptyList(), tableOptions);
// base records
writeData(
@@ -288,19 +231,60 @@ public class CompactActionITCase extends
CompactActionITCaseBase {
rowData(1, 100, 16, BinaryString.fromString("20221208")),
rowData(1, 100, 15, BinaryString.fromString("20221209")));
- Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
- assertThat(snapshot.id()).isEqualTo(2);
-
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
+ checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
- FileStoreScan storeScan = table.store().newScan();
- StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.setRuntimeMode(RuntimeExecutionMode.BATCH);
- env.setParallelism(ThreadLocalRandom.current().nextInt(2) + 1);
- new CompactAction(warehouse, database, tableName).build(env);
- env.execute();
+ if (ThreadLocalRandom.current().nextBoolean()) {
+ runAction(false);
+ } else {
+ callProcedure(false);
+ }
// first compaction, snapshot will be 3.
- checkFileAndRowSize(storeScan, 3L, 0L, 1, 6);
+ checkFileAndRowSize(table, 3L, 0L, 1, 6);
+ }
+
+ private FileStoreTable prepareTable(
+ List<String> partitionKeys, List<String> primaryKeys, Map<String,
String> tableOptions)
+ throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(ROW_TYPE, partitionKeys, primaryKeys,
tableOptions);
+
+ StreamWriteBuilder streamWriteBuilder =
+ table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = streamWriteBuilder.newWrite();
+ commit = streamWriteBuilder.newCommit();
+
+ return table;
+ }
+
+ private void checkLatestSnapshot(
+ FileStoreTable table, long snapshotId, Snapshot.CommitKind
commitKind) {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
+ assertThat(snapshot.id()).isEqualTo(snapshotId);
+ assertThat(snapshot.commitKind()).isEqualTo(commitKind);
+ }
+
+ private void runAction(boolean isStreaming) throws Exception {
+ StreamExecutionEnvironment env = buildDefaultEnv(isStreaming);
+
+ new CompactAction(warehouse, database, tableName)
+ .withPartitions(getSpecifiedPartitions())
+ .build(env);
+ if (isStreaming) {
+ env.executeAsync();
+ } else {
+ env.execute();
+ }
+ }
+
+ private void callProcedure(boolean isStreaming) {
+ callProcedure(
+ String.format(
+ "CALL compact('%s.%s', '', '', '%s', '%s')",
+ database, tableName, "dt=20221208,hh=15",
"dt=20221209,hh=15"),
+ isStreaming,
+ !isStreaming);
}
private List<Map<String, String>> getSpecifiedPartitions() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
index 0f2ada0a9..4c646444c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCaseBase.java
@@ -25,6 +25,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
import java.util.ArrayList;
import java.util.List;
@@ -35,6 +36,7 @@ import static org.assertj.core.api.Assertions.assertThat;
/** Base IT cases for {@link CompactAction} and {@link CompactDatabaseAction}
. */
public class CompactActionITCaseBase extends ActionITCaseBase {
+
protected void validateResult(
FileStoreTable table,
RowType rowType,
@@ -63,8 +65,10 @@ public class CompactActionITCaseBase extends
ActionITCaseBase {
}
protected void checkFileAndRowSize(
- FileStoreScan scan, Long expectedSnapshotId, Long timeout, int
fileNum, long rowCount)
+ FileStoreTable table, Long expectedSnapshotId, Long timeout, int
fileNum, long rowCount)
throws Exception {
+ SnapshotManager snapshotManager = table.snapshotManager();
+ FileStoreScan scan = table.store().newScan();
long start = System.currentTimeMillis();
while (!Objects.equals(snapshotManager.latestSnapshotId(),
expectedSnapshotId)) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
index 2c112f290..ce2e72641 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java
@@ -24,7 +24,6 @@ import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
-import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
@@ -37,6 +36,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.execution.JobClient;
@@ -75,17 +75,15 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
private FileStoreTable createTable(
String databaseName,
String tableName,
- RowType rowType,
List<String> partitionKeys,
List<String> primaryKeys,
Map<String, String> options)
throws Exception {
-
Identifier identifier = Identifier.create(databaseName, tableName);
catalog.createDatabase(databaseName, true);
catalog.createTable(
identifier,
- new Schema(rowType.getFields(), partitionKeys, primaryKeys,
options, ""),
+ new Schema(ROW_TYPE.getFields(), partitionKeys, primaryKeys,
options, ""),
false);
return (FileStoreTable) catalog.getTable(identifier);
}
@@ -107,12 +105,11 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
createTable(
dbName,
tableName,
- ROW_TYPE,
Arrays.asList("dt", "hh"),
Arrays.asList("dt", "hh", "k"),
options);
tables.add(table);
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
@@ -156,7 +153,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
env.execute();
for (FileStoreTable table : tables) {
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
Snapshot snapshot =
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
assertThat(snapshot.id()).isEqualTo(3);
@@ -196,12 +193,11 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
createTable(
dbName,
tableName,
- ROW_TYPE,
Arrays.asList("dt", "hh"),
Arrays.asList("dt", "hh", "k"),
options);
tables.add(table);
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
@@ -259,7 +255,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
"+I[1, 100, 16, 20221208]"),
60_000);
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
@@ -306,12 +302,11 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
createTable(
dbName,
tableName,
- ROW_TYPE,
Arrays.asList("dt", "hh"),
Arrays.asList("dt", "hh", "k"),
options);
newtables.add(table);
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
@@ -346,7 +341,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
"+I[1, 100, 16, 20221208]"),
60_000);
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
@@ -454,7 +449,6 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
createTable(
dbName,
tableName,
- ROW_TYPE,
Arrays.asList("dt", "hh"),
Arrays.asList("dt", "hh", "k"),
options);
@@ -464,7 +458,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
noCompactionTables.add(table);
}
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
@@ -508,7 +502,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
env.execute();
for (FileStoreTable table : compactionTables) {
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
Snapshot snapshot =
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
@@ -523,7 +517,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
}
for (FileStoreTable table : noCompactionTables) {
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
Snapshot snapshot =
table.snapshotManager().snapshot(snapshotManager.latestSnapshotId());
@@ -553,12 +547,11 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
createTable(
database,
tableName,
- ROW_TYPE,
- Arrays.asList("k"),
+ Collections.singletonList("k"),
Collections.emptyList(),
options);
tables.add(table);
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
@@ -593,16 +586,13 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
JobClient client = env.executeAsync();
for (FileStoreTable table : tables) {
- FileStoreScan storeScan = table.store().newScan();
-
- snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
commit = streamWriteBuilder.newCommit();
// first compaction, snapshot will be 3
- checkFileAndRowSize(storeScan, 3L, 30_000L, 1, 6);
+ checkFileAndRowSize(table, 3L, 30_000L, 1, 6);
writeData(
rowData(1, 101, 15, BinaryString.fromString("20221208")),
@@ -610,7 +600,7 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
rowData(1, 101, 15, BinaryString.fromString("20221209")));
// second compaction, snapshot will be 5
- checkFileAndRowSize(storeScan, 5L, 30_000L, 1, 9);
+ checkFileAndRowSize(table, 5L, 30_000L, 1, 9);
}
client.cancel().get();
@@ -630,12 +620,11 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
createTable(
database,
tableName,
- ROW_TYPE,
Collections.singletonList("k"),
Collections.emptyList(),
options);
tables.add(table);
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
@@ -668,10 +657,8 @@ public class CompactDatabaseActionITCase extends
CompactActionITCaseBase {
env.execute();
for (FileStoreTable table : tables) {
- FileStoreScan storeScan = table.store().newScan();
- snapshotManager = table.snapshotManager();
// first compaction, snapshot will be 3.
- checkFileAndRowSize(storeScan, 3L, 0L, 1, 6);
+ checkFileAndRowSize(table, 3L, 0L, 1, 6);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
index ab20863e8..b250c80e2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DeleteActionITCase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BlockingIterator;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.types.Row;
import org.junit.jupiter.api.BeforeEach;
@@ -79,6 +80,7 @@ public class DeleteActionITCase extends ActionITCaseBase {
action.run();
+ SnapshotManager snapshotManager =
getFileStoreTable(tableName).snapshotManager();
Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
assertThat(snapshot.id()).isEqualTo(2);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
@@ -156,7 +158,7 @@ public class DeleteActionITCase extends ActionITCaseBase {
Collections.emptyList(),
hasPk ? Collections.singletonList("k") :
Collections.emptyList(),
options);
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
index c5c4c941b..e477adfed 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/DropPartitionActionITCase.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.SnapshotManager;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -61,6 +62,7 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
Collections.emptyMap())
.run();
+ SnapshotManager snapshotManager =
getFileStoreTable(tableName).snapshotManager();
Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
assertThat(snapshot.id()).isEqualTo(5);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
@@ -112,6 +114,7 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
Collections.emptyMap())
.run();
+ SnapshotManager snapshotManager =
getFileStoreTable(tableName).snapshotManager();
Snapshot snapshot =
snapshotManager.snapshot(snapshotManager.latestSnapshotId());
assertThat(snapshot.id()).isEqualTo(5);
assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
@@ -153,7 +156,7 @@ public class DropPartitionActionITCase extends
ActionITCaseBase {
? Arrays.asList("partKey0", "partKey1", "dt")
: Collections.emptyList(),
new HashMap<>());
- snapshotManager = table.snapshotManager();
+ SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = streamWriteBuilder.newWrite();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
index 38555d47d..97c8637a5 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RollbackToActionITCase.java
@@ -56,7 +56,6 @@ public class RollbackToActionITCase extends ActionITCaseBase {
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());
- snapshotManager = table.snapshotManager();
StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();
@@ -83,7 +82,6 @@ public class RollbackToActionITCase extends ActionITCaseBase {
Collections.emptyList(),
Collections.singletonList("k"),
Collections.emptyMap());
- snapshotManager = table.snapshotManager();
StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
write = writeBuilder.newWrite();
commit = writeBuilder.newCommit();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
similarity index 86%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
index a4e2424f0..97bfea0a1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/OrderRewriteActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionITCase.java
@@ -18,19 +18,13 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.options.CatalogOptions;
-import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
@@ -44,7 +38,6 @@ import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
import java.util.ArrayList;
import java.util.Arrays;
@@ -54,13 +47,10 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
/** Order Rewrite Action tests for {@link SortCompactAction}. */
-public class OrderRewriteActionITCase extends ActionITCaseBase {
+public class SortCompactActionITCase extends ActionITCaseBase {
private static final Random random = new Random();
- private Catalog catalog;
- @TempDir private java.nio.file.Path path;
-
private void prepareData(int size, int loop) throws Exception {
createTable();
List<CommitMessage> commitMessages = new ArrayList<>();
@@ -217,45 +207,43 @@ public class OrderRewriteActionITCase extends
ActionITCaseBase {
}
private void zorder(List<String> columns) throws Exception {
- SortCompactAction sortCompactAction =
- new SortCompactAction(
- new Path(path.toUri()).toUri().toString(),
- "my_db",
- "Orders1",
- Collections.emptyMap());
- sortCompactAction.withOrderStrategy("zorder");
- sortCompactAction.withOrderColumns(columns);
- sortCompactAction.run();
+ if (random.nextBoolean()) {
+ new SortCompactAction(warehouse, database, tableName,
Collections.emptyMap())
+ .withOrderStrategy("zorder")
+ .withOrderColumns(columns)
+ .run();
+ } else {
+ callProcedure("zorder", columns);
+ }
}
private void order(List<String> columns) throws Exception {
- SortCompactAction sortCompactAction =
- new SortCompactAction(
- new Path(path.toUri()).toUri().toString(),
- "my_db",
- "Orders1",
- Collections.emptyMap());
- sortCompactAction.withOrderStrategy("order");
- sortCompactAction.withOrderColumns(columns);
- sortCompactAction.run();
+ if (random.nextBoolean()) {
+ new SortCompactAction(warehouse, database, tableName,
Collections.emptyMap())
+ .withOrderStrategy("order")
+ .withOrderColumns(columns)
+ .run();
+ } else {
+ callProcedure("order", columns);
+ }
}
- public Catalog getCatalog() {
- if (catalog == null) {
- Options options = new Options();
- options.set(CatalogOptions.WAREHOUSE, new
Path(path.toUri()).toUri().toString());
- catalog =
CatalogFactory.createCatalog(CatalogContext.create(options));
- }
- return catalog;
+ private void callProcedure(String orderStrategy, List<String>
orderByColumns) {
+ callProcedure(
+ String.format(
+ "CALL compact('%s.%s', '%s', '%s')",
+ database, tableName, orderStrategy, String.join(",",
orderByColumns)),
+ false,
+ true);
}
public void createTable() throws Exception {
- getCatalog().createDatabase("my_db", true);
- getCatalog().createTable(identifier(), schema(), true);
+ catalog.createDatabase(database, true);
+ catalog.createTable(identifier(), schema(), true);
}
public Identifier identifier() {
- return Identifier.create("my_db", "Orders1");
+ return Identifier.create(database, tableName);
}
private void commit(List<CommitMessage> messages) throws Exception {
@@ -299,7 +287,7 @@ public class OrderRewriteActionITCase extends
ActionITCaseBase {
}
public Table getTable() throws Exception {
- return getCatalog().getTable(identifier());
+ return catalog.getTable(identifier());
}
private static List<CommitMessage> writeOnce(Table table, int p, int size)
throws Exception {
diff --git a/paimon-flink/pom.xml b/paimon-flink/pom.xml
index e92bb526b..8daa819ee 100644
--- a/paimon-flink/pom.xml
+++ b/paimon-flink/pom.xml
@@ -39,6 +39,7 @@ under the License.
<module>paimon-flink-1.15</module>
<module>paimon-flink-1.16</module>
<module>paimon-flink-1.17</module>
+ <module>paimon-flink-1.18</module>
<module>paimon-flink-action</module>
<module>paimon-flink-cdc</module>
</modules>
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index e075d7e21..8893960ea 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -441,7 +441,7 @@ public class HiveCatalog extends AbstractCatalog {
}
@Override
- protected String warehouse() {
+ public String warehouse() {
return warehouse;
}