This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 12d6db57d [spark] Implement distributed orphan file clean for spark 
(#4207)
12d6db57d is described below

commit 12d6db57d073475510d868fd32e1b2f9f5039355
Author: Xiduo You <[email protected]>
AuthorDate: Thu Sep 19 21:40:04 2024 +0800

    [spark] Implement distributed orphan file clean for spark (#4207)
---
 .../paimon/operation/LocalOrphanFilesClean.java    |  10 +-
 .../apache/paimon/operation/OrphanFilesClean.java  |  51 +++--
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java |   6 +-
 .../procedure/RemoveOrphanFilesProcedure.java      |  47 ++---
 .../spark/orphan/SparkOrphanFilesClean.scala       | 224 +++++++++++++++++++++
 .../procedure/RemoveOrphanFilesProcedureTest.scala |  22 +-
 6 files changed, 295 insertions(+), 65 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 9cce1061f..d68318e96 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -53,7 +53,12 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecute;
 
-/** Local {@link OrphanFilesClean}, it will use thread pool to execute 
deletion. */
+/**
+ * Local {@link OrphanFilesClean}, it will use thread pool to execute deletion.
+ *
+ * <p>Note that, this class is not used any more since each engine should 
implement its own
+ * distributed one. See `FlinkOrphanFilesClean` and `SparkOrphanFilesClean`.
+ */
 public class LocalOrphanFilesClean extends OrphanFilesClean {
 
     private final ThreadPoolExecutor executor;
@@ -109,8 +114,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
                 
table.switchToBranch(branch).store().manifestFileFactory().create();
         try {
             List<String> manifests = new ArrayList<>();
-            collectWithoutDataFile(
-                    branch, usedFiles::add, manifest -> 
manifests.add(manifest.fileName()));
+            collectWithoutDataFile(branch, usedFiles::add, manifests::add);
             usedFiles.addAll(retryReadingDataFiles(manifestFile, manifests));
         } catch (IOException e) {
             throw new RuntimeException(e);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
index 8ce95337f..0f2bad27f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java
@@ -32,6 +32,7 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.DateTimeUtils;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SerializableConsumer;
 import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.utils.TagManager;
@@ -135,16 +136,6 @@ public abstract class OrphanFilesClean implements 
Serializable {
         }
     }
 
-    protected void collectWithoutDataFile(
-            String branch,
-            Consumer<String> usedFileConsumer,
-            Consumer<ManifestFileMeta> manifestConsumer)
-            throws IOException {
-        for (Snapshot snapshot : safelyGetAllSnapshots(branch)) {
-            collectWithoutDataFile(branch, snapshot, usedFileConsumer, 
manifestConsumer);
-        }
-    }
-
     protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws 
IOException {
         FileStoreTable branchTable = table.switchToBranch(branch);
         SnapshotManager snapshotManager = branchTable.snapshotManager();
@@ -155,11 +146,34 @@ public abstract class OrphanFilesClean implements 
Serializable {
         return readSnapshots;
     }
 
+    protected void collectWithoutDataFile(
+            String branch, Consumer<String> usedFileConsumer, Consumer<String> 
manifestConsumer)
+            throws IOException {
+        for (Snapshot snapshot : safelyGetAllSnapshots(branch)) {
+            collectWithoutDataFile(branch, snapshot, usedFileConsumer, 
manifestConsumer);
+        }
+    }
+
     protected void collectWithoutDataFile(
             String branch,
             Snapshot snapshot,
             Consumer<String> usedFileConsumer,
-            Consumer<ManifestFileMeta> manifestConsumer)
+            Consumer<String> manifestConsumer)
+            throws IOException {
+        Consumer<Pair<String, Boolean>> usedFileWithFlagConsumer =
+                fileAndFlag -> {
+                    if (fileAndFlag.getRight()) {
+                        manifestConsumer.accept(fileAndFlag.getLeft());
+                    }
+                    usedFileConsumer.accept(fileAndFlag.getLeft());
+                };
+        collectWithoutDataFileWithManifestFlag(branch, snapshot, 
usedFileWithFlagConsumer);
+    }
+
+    protected void collectWithoutDataFileWithManifestFlag(
+            String branch,
+            Snapshot snapshot,
+            Consumer<Pair<String, Boolean>> usedFileWithFlagConsumer)
             throws IOException {
         FileStoreTable branchTable = table.switchToBranch(branch);
         ManifestList manifestList = 
branchTable.store().manifestListFactory().create();
@@ -167,7 +181,7 @@ public abstract class OrphanFilesClean implements 
Serializable {
         List<ManifestFileMeta> manifestFileMetas = new ArrayList<>();
         // changelog manifest
         if (snapshot.changelogManifestList() != null) {
-            usedFileConsumer.accept(snapshot.changelogManifestList());
+            
usedFileWithFlagConsumer.accept(Pair.of(snapshot.changelogManifestList(), 
false));
             manifestFileMetas.addAll(
                     retryReadingFiles(
                             () ->
@@ -178,7 +192,7 @@ public abstract class OrphanFilesClean implements 
Serializable {
 
         // delta manifest
         if (snapshot.deltaManifestList() != null) {
-            usedFileConsumer.accept(snapshot.deltaManifestList());
+            
usedFileWithFlagConsumer.accept(Pair.of(snapshot.deltaManifestList(), false));
             manifestFileMetas.addAll(
                     retryReadingFiles(
                             () -> 
manifestList.readWithIOException(snapshot.deltaManifestList()),
@@ -186,7 +200,7 @@ public abstract class OrphanFilesClean implements 
Serializable {
         }
 
         // base manifest
-        usedFileConsumer.accept(snapshot.baseManifestList());
+        usedFileWithFlagConsumer.accept(Pair.of(snapshot.baseManifestList(), 
false));
         manifestFileMetas.addAll(
                 retryReadingFiles(
                         () -> 
manifestList.readWithIOException(snapshot.baseManifestList()),
@@ -194,26 +208,25 @@ public abstract class OrphanFilesClean implements 
Serializable {
 
         // collect manifests
         for (ManifestFileMeta manifest : manifestFileMetas) {
-            manifestConsumer.accept(manifest);
-            usedFileConsumer.accept(manifest.fileName());
+            usedFileWithFlagConsumer.accept(Pair.of(manifest.fileName(), 
true));
         }
 
         // index files
         String indexManifest = snapshot.indexManifest();
         if (indexManifest != null && 
indexFileHandler.existsManifest(indexManifest)) {
-            usedFileConsumer.accept(indexManifest);
+            usedFileWithFlagConsumer.accept(Pair.of(indexManifest, false));
             retryReadingFiles(
                             () -> 
indexFileHandler.readManifestWithIOException(indexManifest),
                             Collections.<IndexManifestEntry>emptyList())
                     .stream()
                     .map(IndexManifestEntry::indexFile)
                     .map(IndexFileMeta::fileName)
-                    .forEach(usedFileConsumer);
+                    .forEach(name -> 
usedFileWithFlagConsumer.accept(Pair.of(name, false)));
         }
 
         // statistic file
         if (snapshot.statistics() != null) {
-            usedFileConsumer.accept(snapshot.statistics());
+            usedFileWithFlagConsumer.accept(Pair.of(snapshot.statistics(), 
false));
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index 6740e8980..f50414620 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -27,7 +27,6 @@ import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFile;
-import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.operation.OrphanFilesClean;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -134,11 +133,10 @@ public class FlinkOrphanFilesClean extends 
OrphanFilesClean {
                                             throws Exception {
                                         String branch = branchAndSnapshot.f0;
                                         Snapshot snapshot = 
Snapshot.fromJson(branchAndSnapshot.f1);
-                                        Consumer<ManifestFileMeta> 
manifestConsumer =
+                                        Consumer<String> manifestConsumer =
                                                 manifest -> {
                                                     Tuple2<String, String> 
tuple2 =
-                                                            new Tuple2<>(
-                                                                    branch, 
manifest.fileName());
+                                                            new 
Tuple2<>(branch, manifest);
                                                     
ctx.output(manifestOutputTag, tuple2);
                                                 };
                                         collectWithoutDataFile(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index 7a3c8df4d..4f442fbae 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -19,9 +19,9 @@
 package org.apache.paimon.spark.procedure;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.operation.LocalOrphanFilesClean;
 import org.apache.paimon.operation.OrphanFilesClean;
 import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.spark.orphan.SparkOrphanFilesClean;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -29,17 +29,12 @@ import org.apache.spark.sql.connector.catalog.TableCatalog;
 import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
-import org.apache.spark.unsafe.types.UTF8String;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import static 
org.apache.paimon.operation.LocalOrphanFilesClean.executeOrphanFilesClean;
 import static org.apache.spark.sql.types.DataTypes.BooleanType;
 import static org.apache.spark.sql.types.DataTypes.IntegerType;
+import static org.apache.spark.sql.types.DataTypes.LongType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
 
 /**
@@ -67,7 +62,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure 
{
     private static final StructType OUTPUT_TYPE =
             new StructType(
                     new StructField[] {
-                        new StructField("result", StringType, true, 
Metadata.empty())
+                        new StructField("result", LongType, true, 
Metadata.empty())
                     });
 
     private RemoveOrphanFilesProcedure(TableCatalog tableCatalog) {
@@ -102,29 +97,19 @@ public class RemoveOrphanFilesProcedure extends 
BaseProcedure {
         }
         LOG.info("identifier is {}.", identifier);
 
-        List<LocalOrphanFilesClean> tableCleans;
-        try {
-            Catalog catalog = ((WithPaimonCatalog) 
tableCatalog()).paimonCatalog();
-            tableCleans =
-                    LocalOrphanFilesClean.createOrphanFilesCleans(
-                            catalog,
-                            identifier.getDatabaseName(),
-                            identifier.getObjectName(),
-                            OrphanFilesClean.olderThanMillis(
-                                    args.isNullAt(1) ? null : 
args.getString(1)),
-                            OrphanFilesClean.createFileCleaner(
-                                    catalog, !args.isNullAt(2) && 
args.getBoolean(2)),
-                            args.isNullAt(3) ? null : args.getInt(3));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-
-        String[] result = executeOrphanFilesClean(tableCleans);
-        List<InternalRow> rows = new ArrayList<>();
-        Arrays.stream(result)
-                .forEach(line -> 
rows.add(newInternalRow(UTF8String.fromString(line))));
-
-        return rows.toArray(new InternalRow[0]);
+        Catalog catalog = ((WithPaimonCatalog) tableCatalog()).paimonCatalog();
+        long deletedFiles =
+                SparkOrphanFilesClean.executeDatabaseOrphanFiles(
+                        catalog,
+                        identifier.getDatabaseName(),
+                        identifier.getTableName(),
+                        OrphanFilesClean.olderThanMillis(
+                                args.isNullAt(1) ? null : args.getString(1)),
+                        OrphanFilesClean.createFileCleaner(
+                                catalog, !args.isNullAt(2) && 
args.getBoolean(2)),
+                        args.isNullAt(3) ? null : args.getInt(3));
+
+        return new InternalRow[] {newInternalRow(deletedFiles)};
     }
 
     public static ProcedureBuilder builder() {
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
new file mode 100644
index 000000000..d79105e24
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.orphan
+
+import org.apache.paimon.{utils, Snapshot}
+import org.apache.paimon.catalog.{Catalog, Identifier}
+import org.apache.paimon.fs.Path
+import org.apache.paimon.manifest.{ManifestEntry, ManifestFile}
+import org.apache.paimon.operation.OrphanFilesClean
+import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.utils.SerializableConsumer
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.functions.sum
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicLong
+import java.util.function.Consumer
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
+
+case class SparkOrphanFilesClean(
+    specifiedTable: FileStoreTable,
+    specifiedOlderThanMillis: Long,
+    specifiedFileCleaner: SerializableConsumer[Path],
+    parallelism: Int,
+    @transient spark: SparkSession)
+  extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, 
specifiedFileCleaner)
+  with SQLConfHelper
+  with Logging {
+
+  def doOrphanClean(): (Dataset[Long], Dataset[BranchAndManifestFile]) = {
+    import spark.implicits._
+
+    val branches = validBranches()
+    val deletedInLocal = new AtomicLong(0)
+    // snapshot and changelog files are the root of everything, so they are 
handled specially
+    // here, and subsequently, we will not count their orphan files.
+    cleanSnapshotDir(branches, (_: Path) => deletedInLocal.incrementAndGet)
+
+    val maxBranchParallelism = Math.min(branches.size(), parallelism)
+    // find snapshots using branch and find manifests(manifest, index, 
statistics) using snapshot
+    val usedManifestFiles = spark.sparkContext
+      .parallelize(branches.asScala, maxBranchParallelism)
+      .mapPartitions(_.flatMap {
+        branch => safelyGetAllSnapshots(branch).asScala.map(snapshot => 
(branch, snapshot.toJson))
+      })
+      .repartition(parallelism)
+      .flatMap {
+        case (branch, snapshotJson) =>
+          val usedFileBuffer = new ArrayBuffer[BranchAndManifestFile]()
+          val usedFileConsumer =
+            new Consumer[org.apache.paimon.utils.Pair[String, 
java.lang.Boolean]] {
+              override def accept(pair: utils.Pair[String, 
java.lang.Boolean]): Unit = {
+                usedFileBuffer.append(BranchAndManifestFile(branch, 
pair.getLeft, pair.getRight))
+              }
+            }
+          val snapshot = Snapshot.fromJson(snapshotJson)
+          collectWithoutDataFileWithManifestFlag(branch, snapshot, 
usedFileConsumer)
+          usedFileBuffer
+      }
+      .toDS()
+      .cache()
+
+    // find all data files
+    val dataFiles = usedManifestFiles
+      .filter(_.isManifestFile)
+      .distinct()
+      .mapPartitions {
+        it =>
+          val branchManifests = new util.HashMap[String, ManifestFile]
+          it.flatMap {
+            branchAndManifestFile =>
+              val manifestFile = branchManifests.computeIfAbsent(
+                branchAndManifestFile.branch,
+                (key: String) =>
+                  
specifiedTable.switchToBranch(key).store.manifestFileFactory.create)
+
+              retryReadingFiles(
+                () => 
manifestFile.readWithIOException(branchAndManifestFile.manifestName),
+                Collections.emptyList[ManifestEntry]
+              ).asScala.flatMap {
+                manifestEntry =>
+                  manifestEntry.fileName() +: 
manifestEntry.file().extraFiles().asScala
+              }
+          }
+      }
+
+    // union manifest and data files
+    val usedFiles = usedManifestFiles
+      .map(_.manifestName)
+      .union(dataFiles)
+      .toDF("used_name")
+
+    // find candidate files which can be removed
+    val fileDirs = listPaimonFileDirs.asScala.map(_.toUri.toString)
+    val maxFileDirsParallelism = Math.min(fileDirs.size, parallelism)
+    val candidates = spark.sparkContext
+      .parallelize(fileDirs, maxFileDirsParallelism)
+      .flatMap {
+        dir =>
+          tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
+            file => (file.getPath.getName, file.getPath.toUri.toString)
+          }
+      }
+      .toDF("name", "path")
+      .repartition(parallelism)
+
+    // use left anti to filter files which is not used
+    val deleted = candidates
+      .join(usedFiles, $"name" === $"used_name", "left_anti")
+      .mapPartitions {
+        it =>
+          var deleted = 0L
+          while (it.hasNext) {
+            val pathToClean = it.next().getString(1)
+            specifiedFileCleaner.accept(new Path(pathToClean))
+            logInfo(s"Cleaned file: $pathToClean")
+            deleted += 1
+          }
+          logInfo(s"Total cleaned files: $deleted");
+          Iterator.single(deleted)
+      }
+    val finalDeletedDataset = if (deletedInLocal.get() != 0) {
+      deleted.union(spark.createDataset(Seq(deletedInLocal.get())))
+    } else {
+      deleted
+    }
+
+    (finalDeletedDataset, usedManifestFiles)
+  }
+}
+
+/**
+ * @param branch
+ *   The branch name
+ * @param manifestName
+ *   The manifest file name, including manifest-list, manifest, 
index-manifest, statistics
+ * @param isManifestFile
+ *   If it is the manifest file
+ */
+case class BranchAndManifestFile(branch: String, manifestName: String, 
isManifestFile: Boolean)
+
+object SparkOrphanFilesClean extends SQLConfHelper {
+  def executeDatabaseOrphanFiles(
+      catalog: Catalog,
+      databaseName: String,
+      tableName: String,
+      olderThanMillis: Long,
+      fileCleaner: SerializableConsumer[Path],
+      parallelismOpt: Integer): Long = {
+    val spark = SparkSession.active
+    val parallelism = if (parallelismOpt == null) {
+      Math.max(spark.sparkContext.defaultParallelism, 
conf.numShufflePartitions)
+    } else {
+      parallelismOpt.intValue()
+    }
+
+    val tableNames = if (tableName == null || "*" == tableName) {
+      catalog.listTables(databaseName).asScala
+    } else {
+      tableName :: Nil
+    }
+    val tables = tableNames.map {
+      tableName =>
+        val identifier = new Identifier(databaseName, tableName)
+        val table = catalog.getTable(identifier)
+        assert(
+          table.isInstanceOf[FileStoreTable],
+          s"Only FileStoreTable supports remove-orphan-files action. The table 
type is '${table.getClass.getName}'.")
+        table.asInstanceOf[FileStoreTable]
+    }
+    if (tables.isEmpty) {
+      return 0
+    }
+    val (deleted, waitToRelease) = tables.map {
+      table =>
+        new SparkOrphanFilesClean(
+          table,
+          olderThanMillis,
+          fileCleaner,
+          parallelism,
+          spark
+        ).doOrphanClean()
+    }.unzip
+    try {
+      val result = deleted
+        .reduce((l, r) => l.union(r))
+        .toDF("deleted")
+        .agg(sum("deleted"))
+        .head()
+      assert(result.schema.size == 1, result.schema)
+      if (result.isNullAt(0)) {
+        // no files can be deleted
+        0
+      } else {
+        result.getLong(0)
+      }
+    } finally {
+      waitToRelease.foreach(_.unpersist())
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
index 23a014d0f..c414515f1 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala
@@ -52,7 +52,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     fileIO.tryToWriteAtomic(orphanFile2, "b")
 
     // by default, no file deleted
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
 
     val orphanFile2ModTime = 
fileIO.getFileStatus(orphanFile2).getModificationTime
     val older_than1 = DateTimeUtils.formatLocalDateTime(
@@ -63,7 +63,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
 
     checkAnswer(
       spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than1')"),
-      Row(orphanFile1.toUri.getPath) :: Nil)
+      Row(1) :: Nil)
 
     val older_than2 = DateTimeUtils.formatLocalDateTime(
       DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
@@ -71,7 +71,9 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
 
     checkAnswer(
       spark.sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than2')"),
-      Row(orphanFile2.toUri.getPath) :: Nil)
+      Row(1) :: Nil)
+
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
   }
 
   test("Paimon procedure: dry run remove orphan files") {
@@ -95,7 +97,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     fileIO.writeFile(orphanFile2, "b", true)
 
     // by default, no file deleted
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
 
     val older_than = DateTimeUtils.formatLocalDateTime(
       DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
@@ -104,8 +106,10 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     checkAnswer(
       spark.sql(
         s"CALL sys.remove_orphan_files(table => 'T', older_than => 
'$older_than', dry_run => true)"),
-      Row(orphanFile1.toUri.getPath) :: Row(orphanFile2.toUri.getPath) :: Nil
+      Row(2) :: Nil
     )
+
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), 
Row(0) :: Nil)
   }
 
   test("Paimon procedure: remove database orphan files") {
@@ -142,7 +146,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
     fileIO2.tryToWriteAtomic(orphanFile22, "b")
 
     // by default, no file deleted
-    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), 
Nil)
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), 
Row(0) :: Nil)
 
     val orphanFile12ModTime = 
fileIO1.getFileStatus(orphanFile12).getModificationTime
     val older_than1 = DateTimeUtils.formatLocalDateTime(
@@ -153,7 +157,7 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
 
     checkAnswer(
       spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than 
=> '$older_than1')"),
-      Row(orphanFile11.toUri.getPath) :: Row(orphanFile21.toUri.getPath) :: Nil
+      Row(2) :: Nil
     )
 
     val older_than2 = DateTimeUtils.formatLocalDateTime(
@@ -162,8 +166,10 @@ class RemoveOrphanFilesProcedureTest extends 
PaimonSparkTestBase {
 
     checkAnswer(
       spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*', older_than 
=> '$older_than2')"),
-      Row(orphanFile12.toUri.getPath) :: Row(orphanFile22.toUri.getPath) :: Nil
+      Row(2) :: Nil
     )
+
+    checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'test.*')"), 
Row(0) :: Nil)
   }
 
 }

Reply via email to