This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ebef05034 [flink] change env.fromData to fromElements for
compatibility with flink 1.18 and lower versions (#4541)
ebef05034 is described below
commit ebef050348de90f91193314079a6fb08e785c985
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Nov 19 20:19:36 2024 +0800
[flink] change env.fromData to fromElements for compatibility with flink
1.18 and lower versions (#4541)
---
.../procedure/RemoveOrphanFilesProcedure.java | 62 +++-
.../flink}/RemoveOrphanFilesActionITCase.java | 108 +++----
.../flink/RemoveOrphanFilesActionITCase.java | 25 ++
.../paimon/flink/orphan/FlinkOrphanFilesClean.java | 2 +-
.../action/RemoveOrphanFilesActionITCase.java | 323 +--------------------
...java => RemoveOrphanFilesActionITCaseBase.java} | 4 +-
6 files changed, 114 insertions(+), 410 deletions(-)
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
index c5fa7b7ba..7695c510b 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java
@@ -20,9 +20,12 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.orphan.FlinkOrphanFilesClean;
+import org.apache.paimon.operation.LocalOrphanFilesClean;
import org.apache.flink.table.procedure.ProcedureContext;
+import java.util.Locale;
+
import static org.apache.paimon.operation.OrphanFilesClean.createFileCleaner;
import static org.apache.paimon.operation.OrphanFilesClean.olderThanMillis;
@@ -66,20 +69,57 @@ public class RemoveOrphanFilesProcedure extends
ProcedureBase {
boolean dryRun,
Integer parallelism)
throws Exception {
+ return call(procedureContext, tableId, olderThan, dryRun, parallelism,
null);
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String olderThan,
+ boolean dryRun,
+ Integer parallelism,
+ String mode)
+ throws Exception {
Identifier identifier = Identifier.fromString(tableId);
String databaseName = identifier.getDatabaseName();
String tableName = identifier.getObjectName();
-
- long deleted =
- FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
- procedureContext.getExecutionEnvironment(),
- catalog,
- olderThanMillis(olderThan),
- createFileCleaner(catalog, dryRun),
- parallelism,
- databaseName,
- tableName);
- return new String[] {String.valueOf(deleted)};
+ if (mode == null) {
+ mode = "DISTRIBUTED";
+ }
+ long deletedFiles;
+ try {
+ switch (mode.toUpperCase(Locale.ROOT)) {
+ case "DISTRIBUTED":
+ deletedFiles =
+ FlinkOrphanFilesClean.executeDatabaseOrphanFiles(
+ procedureContext.getExecutionEnvironment(),
+ catalog,
+ olderThanMillis(olderThan),
+ createFileCleaner(catalog, dryRun),
+ parallelism,
+ databaseName,
+ tableName);
+ break;
+ case "LOCAL":
+ deletedFiles =
+ LocalOrphanFilesClean.executeDatabaseOrphanFiles(
+ catalog,
+ databaseName,
+ tableName,
+ olderThanMillis(olderThan),
+ createFileCleaner(catalog, dryRun),
+ parallelism);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "Unknown mode: "
+ + mode
+ + ". Only 'DISTRIBUTED' and 'LOCAL' are
supported.");
+ }
+ return new String[] {String.valueOf(deletedFiles)};
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
similarity index 71%
copy from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
copy to
paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
index 938a8ce1b..46b62b6bf 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++
b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
@@ -16,11 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.flink.action;
+package org.apache.paimon.flink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.action.ActionITCaseBase;
+import org.apache.paimon.flink.action.RemoveOrphanFilesAction;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
@@ -40,8 +42,7 @@ import
org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.Arrays;
@@ -54,9 +55,8 @@ import static
org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
-/** IT cases for {@link RemoveOrphanFilesAction}. */
+/** IT cases base for {@link RemoveOrphanFilesAction} in Flink 1.18. */
public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
-
private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1";
private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2";
@@ -96,9 +96,8 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
return new Path(table.location(), orphanFile);
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testRunWithoutException(boolean isNamedArgument) throws
Exception {
+ @Test
+ public void testRunWithoutException() throws Exception {
createTableAndWriteData(tableName);
List<String> args =
@@ -120,40 +119,29 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
assertThatCode(action2::run).doesNotThrowAnyException();
String withoutOlderThan =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s')"
- : "CALL sys.remove_orphan_files('%s.%s')",
- database,
- tableName);
+ String.format("CALL sys.remove_orphan_files('%s.%s')",
database, tableName);
+
CloseableIterator<Row> withoutOlderThanCollect =
executeSQL(withoutOlderThan);
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
String withDryRun =
String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true)",
- database,
- tableName);
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59', true)",
+ database, tableName);
ImmutableList<Row> actualDryRunDeleteFile =
ImmutableList.copyOf(executeSQL(withDryRun));
assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2"));
String withOlderThan =
String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59')",
- database,
- tableName);
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59')",
+ database, tableName);
ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(executeSQL(withOlderThan));
assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"));
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument)
throws Exception {
+ @Test
+ public void testRemoveDatabaseOrphanFilesITCase() throws Exception {
createTableAndWriteData("tableName1");
createTableAndWriteData("tableName2");
@@ -181,12 +169,7 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
assertThatCode(action3::run).doesNotThrowAnyException();
String withoutOlderThan =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s')"
- : "CALL sys.remove_orphan_files('%s.%s')",
- database,
- "*");
+ String.format("CALL sys.remove_orphan_files('%s.%s')",
database, "*");
CloseableIterator<Row> withoutOlderThanCollect =
executeSQL(withoutOlderThan);
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
@@ -197,29 +180,22 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
String withDryRun =
String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true)",
- database,
- "*");
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59', true)",
+ database, "*");
ImmutableList<Row> actualDryRunDeleteFile =
ImmutableList.copyOf(executeSQL(withDryRun));
assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4"));
String withOlderThan =
String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59')",
- database,
- "*");
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59')",
+ database, "*");
ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(executeSQL(withOlderThan));
assertThat(actualDeleteFile).containsOnly(Row.of("4"));
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testCleanWithBranch(boolean isNamedArgument) throws Exception {
+ @Test
+ public void testCleanWithBranch() throws Exception {
// create main branch
FileStoreTable table = createTableAndWriteData(tableName);
@@ -263,18 +239,14 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
}
String procedure =
String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59')",
- database,
- "*");
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59')",
+ database, "*");
ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(executeSQL(procedure));
assertThat(actualDeleteFile).containsOnly(Row.of("4"));
}
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testRunWithMode(boolean isNamedArgument) throws Exception {
+ @Test
+ public void testRunWithMode() throws Exception {
createTableAndWriteData(tableName);
List<String> args =
@@ -296,44 +268,30 @@ public class RemoveOrphanFilesActionITCase extends
ActionITCaseBase {
assertThatCode(action2::run).doesNotThrowAnyException();
String withoutOlderThan =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s')"
- : "CALL sys.remove_orphan_files('%s.%s')",
- database,
- tableName);
+ String.format("CALL sys.remove_orphan_files('%s.%s')",
database, tableName);
CloseableIterator<Row> withoutOlderThanCollect =
executeSQL(withoutOlderThan);
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
String withLocalMode =
String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism =>
5, mode => 'local')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true, 5, 'local')",
- database,
- tableName);
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59', true, 5, 'local')",
+ database, tableName);
ImmutableList<Row> actualLocalRunDeleteFile =
ImmutableList.copyOf(executeSQL(withLocalMode));
assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2"));
String withDistributedMode =
String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism =>
5, mode => 'distributed')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true, 5, 'distributed')",
- database,
- tableName);
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59', true, 5, 'distributed')",
+ database, tableName);
ImmutableList<Row> actualDistributedRunDeleteFile =
ImmutableList.copyOf(executeSQL(withDistributedMode));
assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2"));
String withInvalidMode =
String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism =>
5, mode => 'unknown')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true, 5, 'unknown')",
- database,
- tableName);
+ "CALL sys.remove_orphan_files('%s.%s', '2999-12-31
23:59:59', true, 5, 'unknown')",
+ database, tableName);
assertThatCode(() -> executeSQL(withInvalidMode))
.isInstanceOf(RuntimeException.class)
.hasMessageContaining("Unknown mode");
diff --git
a/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
new file mode 100644
index 000000000..e1be410b8
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.19/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.flink.action.RemoveOrphanFilesAction;
+import org.apache.paimon.flink.action.RemoveOrphanFilesActionITCaseBase;
+
+/** IT cases base for {@link RemoveOrphanFilesAction} in Flink 1.19. */
+public class RemoveOrphanFilesActionITCase extends
RemoveOrphanFilesActionITCaseBase {}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index f50414620..61bebca24 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -279,7 +279,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
});
if (deletedInLocal.get() != 0) {
- deleted = deleted.union(env.fromData(deletedInLocal.get()));
+ deleted = deleted.union(env.fromElements(deletedInLocal.get()));
}
return deleted;
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
index 938a8ce1b..a92e529aa 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
@@ -18,324 +18,5 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.table.sink.StreamTableCommit;
-import org.apache.paimon.table.sink.StreamTableWrite;
-import org.apache.paimon.table.sink.StreamWriteBuilder;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
-
-import org.apache.flink.types.Row;
-import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH;
-import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.assertThatCode;
-
-/** IT cases for {@link RemoveOrphanFilesAction}. */
-public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
-
- private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1";
- private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2";
-
- private FileStoreTable createTableAndWriteData(String tableName) throws
Exception {
- RowType rowType =
- RowType.of(
- new DataType[] {DataTypes.BIGINT(),
DataTypes.STRING()},
- new String[] {"k", "v"});
-
- FileStoreTable table =
- createFileStoreTable(
- tableName,
- rowType,
- Collections.emptyList(),
- Collections.singletonList("k"),
- Collections.emptyList(),
- Collections.emptyMap());
-
- StreamWriteBuilder writeBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
- write = writeBuilder.newWrite();
- commit = writeBuilder.newCommit();
-
- writeData(rowData(1L, BinaryString.fromString("Hi")));
-
- Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1);
- Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2);
-
- FileIO fileIO = table.fileIO();
- fileIO.writeFile(orphanFile1, "a", true);
- Thread.sleep(2000);
- fileIO.writeFile(orphanFile2, "b", true);
-
- return table;
- }
-
- private Path getOrphanFilePath(FileStoreTable table, String orphanFile) {
- return new Path(table.location(), orphanFile);
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testRunWithoutException(boolean isNamedArgument) throws
Exception {
- createTableAndWriteData(tableName);
-
- List<String> args =
- new ArrayList<>(
- Arrays.asList(
- "remove_orphan_files",
- "--warehouse",
- warehouse,
- "--database",
- database,
- "--table",
- tableName));
- RemoveOrphanFilesAction action1 =
createAction(RemoveOrphanFilesAction.class, args);
- assertThatCode(action1::run).doesNotThrowAnyException();
-
- args.add("--older_than");
- args.add("2023-12-31 23:59:59");
- RemoveOrphanFilesAction action2 =
createAction(RemoveOrphanFilesAction.class, args);
- assertThatCode(action2::run).doesNotThrowAnyException();
-
- String withoutOlderThan =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s')"
- : "CALL sys.remove_orphan_files('%s.%s')",
- database,
- tableName);
- CloseableIterator<Row> withoutOlderThanCollect =
executeSQL(withoutOlderThan);
-
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
-
- String withDryRun =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true)",
- database,
- tableName);
- ImmutableList<Row> actualDryRunDeleteFile =
ImmutableList.copyOf(executeSQL(withDryRun));
- assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2"));
-
- String withOlderThan =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59')",
- database,
- tableName);
- ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(executeSQL(withOlderThan));
-
- assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"));
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument)
throws Exception {
- createTableAndWriteData("tableName1");
- createTableAndWriteData("tableName2");
-
- List<String> args =
- new ArrayList<>(
- Arrays.asList(
- "remove_orphan_files",
- "--warehouse",
- warehouse,
- "--database",
- database,
- "--table",
- "*"));
- RemoveOrphanFilesAction action1 =
createAction(RemoveOrphanFilesAction.class, args);
- assertThatCode(action1::run).doesNotThrowAnyException();
-
- args.add("--older_than");
- args.add("2023-12-31 23:59:59");
- RemoveOrphanFilesAction action2 =
createAction(RemoveOrphanFilesAction.class, args);
- assertThatCode(action2::run).doesNotThrowAnyException();
-
- args.add("--parallelism");
- args.add("5");
- RemoveOrphanFilesAction action3 =
createAction(RemoveOrphanFilesAction.class, args);
- assertThatCode(action3::run).doesNotThrowAnyException();
-
- String withoutOlderThan =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s')"
- : "CALL sys.remove_orphan_files('%s.%s')",
- database,
- "*");
- CloseableIterator<Row> withoutOlderThanCollect =
executeSQL(withoutOlderThan);
-
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
-
- String withParallelism =
- String.format("CALL
sys.remove_orphan_files('%s.%s','',true,5)", database, "*");
- CloseableIterator<Row> withParallelismCollect =
executeSQL(withParallelism);
-
assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0"));
-
- String withDryRun =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true)"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true)",
- database,
- "*");
- ImmutableList<Row> actualDryRunDeleteFile =
ImmutableList.copyOf(executeSQL(withDryRun));
- assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4"));
-
- String withOlderThan =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59')",
- database,
- "*");
- ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(executeSQL(withOlderThan));
-
- assertThat(actualDeleteFile).containsOnly(Row.of("4"));
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testCleanWithBranch(boolean isNamedArgument) throws Exception {
- // create main branch
- FileStoreTable table = createTableAndWriteData(tableName);
-
- // create first branch and write some data
- table.createBranch("br");
- SchemaManager schemaManager = new SchemaManager(table.fileIO(),
table.location(), "br");
- TableSchema branchSchema =
- schemaManager.commitChanges(SchemaChange.addColumn("v2",
DataTypes.INT()));
- Options branchOptions = new Options(branchSchema.options());
- branchOptions.set(CoreOptions.BRANCH, "br");
- branchSchema = branchSchema.copy(branchOptions.toMap());
- FileStoreTable branchTable =
- FileStoreTableFactory.create(table.fileIO(), table.location(),
branchSchema);
-
- String commitUser = UUID.randomUUID().toString();
- StreamTableWrite write = branchTable.newWrite(commitUser);
- StreamTableCommit commit = branchTable.newCommit(commitUser);
- write.write(GenericRow.of(2L, BinaryString.fromString("Hello"), 20));
- commit.commit(1, write.prepareCommit(false, 1));
- write.close();
- commit.close();
-
- // create orphan file in snapshot directory of first branch
- Path orphanFile3 = new Path(table.location(),
"branch/branch-br/snapshot/orphan_file3");
- branchTable.fileIO().writeFile(orphanFile3, "x", true);
-
- // create second branch, which is empty
- table.createBranch("br2");
-
- // create orphan file in snapshot directory of second branch
- Path orphanFile4 = new Path(table.location(),
"branch/branch-br2/snapshot/orphan_file4");
- branchTable.fileIO().writeFile(orphanFile4, "y", true);
-
- if (ThreadLocalRandom.current().nextBoolean()) {
- executeSQL(
- String.format(
- "ALTER TABLE `%s`.`%s` SET ('%s' = 'br')",
- database, tableName, SCAN_FALLBACK_BRANCH.key()),
- false,
- true);
- }
- String procedure =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59')",
- database,
- "*");
- ImmutableList<Row> actualDeleteFile =
ImmutableList.copyOf(executeSQL(procedure));
- assertThat(actualDeleteFile).containsOnly(Row.of("4"));
- }
-
- @ParameterizedTest
- @ValueSource(booleans = {true, false})
- public void testRunWithMode(boolean isNamedArgument) throws Exception {
- createTableAndWriteData(tableName);
-
- List<String> args =
- new ArrayList<>(
- Arrays.asList(
- "remove_orphan_files",
- "--warehouse",
- warehouse,
- "--database",
- database,
- "--table",
- tableName));
- RemoveOrphanFilesAction action1 =
createAction(RemoveOrphanFilesAction.class, args);
- assertThatCode(action1::run).doesNotThrowAnyException();
-
- args.add("--older_than");
- args.add("2023-12-31 23:59:59");
- RemoveOrphanFilesAction action2 =
createAction(RemoveOrphanFilesAction.class, args);
- assertThatCode(action2::run).doesNotThrowAnyException();
-
- String withoutOlderThan =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s')"
- : "CALL sys.remove_orphan_files('%s.%s')",
- database,
- tableName);
- CloseableIterator<Row> withoutOlderThanCollect =
executeSQL(withoutOlderThan);
-
assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0"));
-
- String withLocalMode =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism =>
5, mode => 'local')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true, 5, 'local')",
- database,
- tableName);
- ImmutableList<Row> actualLocalRunDeleteFile =
- ImmutableList.copyOf(executeSQL(withLocalMode));
- assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2"));
-
- String withDistributedMode =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism =>
5, mode => 'distributed')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true, 5, 'distributed')",
- database,
- tableName);
- ImmutableList<Row> actualDistributedRunDeleteFile =
- ImmutableList.copyOf(executeSQL(withDistributedMode));
- assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2"));
-
- String withInvalidMode =
- String.format(
- isNamedArgument
- ? "CALL sys.remove_orphan_files(`table` =>
'%s.%s', older_than => '2999-12-31 23:59:59', dry_run => true, parallelism =>
5, mode => 'unknown')"
- : "CALL sys.remove_orphan_files('%s.%s',
'2999-12-31 23:59:59', true, 5, 'unknown')",
- database,
- tableName);
- assertThatCode(() -> executeSQL(withInvalidMode))
- .isInstanceOf(RuntimeException.class)
- .hasMessageContaining("Unknown mode");
- }
-}
+/** IT cases base for {@link RemoveOrphanFilesAction} in Flink Common. */
+public class RemoveOrphanFilesActionITCase extends
RemoveOrphanFilesActionITCaseBase {}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
similarity index 99%
copy from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
copy to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
index 938a8ce1b..5f874a5a7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java
@@ -54,8 +54,8 @@ import static
org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
-/** IT cases for {@link RemoveOrphanFilesAction}. */
-public class RemoveOrphanFilesActionITCase extends ActionITCaseBase {
+/** IT cases base for {@link RemoveOrphanFilesAction}. */
+public abstract class RemoveOrphanFilesActionITCaseBase extends
ActionITCaseBase {
private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1";
private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2";