This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f76d0dde4 [core] Remove Path.getPath (#2379)
f76d0dde4 is described below

commit f76d0dde4c947fd55e40d8536db494534b441b8d
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 23 17:06:43 2023 +0800

    [core] Remove Path.getPath (#2379)
---
 .../src/main/java/org/apache/paimon/fs/Path.java   |  9 -----
 .../org/apache/paimon/fs/local/LocalFileIO.java    |  2 +-
 .../org/apache/paimon/catalog/CatalogUtils.java    | 10 ++----
 .../org/apache/paimon/schema/SchemaManager.java    | 10 +++---
 .../apache/paimon/format/FileFormatSuffixTest.java |  2 +-
 .../paimon/table/FileStoreTableTestBase.java       | 12 ++++---
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  3 +-
 .../apache/paimon/table/SchemaEvolutionTest.java   |  2 +-
 .../paimon/table/source/StartupModeTest.java       |  2 +-
 .../cdc/CdcMultiplexRecordChannelComputerTest.java |  2 +-
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  |  2 +-
 .../paimon/flink/sink/StoreMultiCommitterTest.java |  2 +-
 .../paimon/hive/mapred/PaimonOutputCommitter.java  | 42 ++++++++++------------
 .../org/apache/paimon/spark/PaimonSinkTest.scala   | 12 +++----
 .../sql/CreateAndDeleteTagProcedureTest.scala      |  2 +-
 .../paimon/spark/sql/RollbackProcedureTest.scala   |  2 +-
 .../paimon/spark/procedure/CompactProcedure.java   |  2 +-
 .../apache/paimon/spark/PaimonCDCSourceTest.scala  |  6 ++--
 .../org/apache/paimon/spark/PaimonSinkTest.scala   | 12 +++----
 .../org/apache/paimon/spark/PaimonSourceTest.scala |  2 +-
 .../spark/procedure/CompactProcedureTest.scala     |  6 ++--
 .../CreateAndDeleteTagProcedureTest.scala          |  2 +-
 .../spark/procedure/RollbackProcedureTest.scala    |  2 +-
 .../paimon/spark/sql/DataFrameWriteTest.scala      |  6 ++--
 24 files changed, 71 insertions(+), 83 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
index 87a9ce7b0..02a46be77 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
@@ -299,15 +299,6 @@ public class Path implements Comparable<Path>, 
Serializable {
         return uri;
     }
 
-    /**
-     * Return full path.
-     *
-     * @return full path
-     */
-    public String getPath() {
-        return uri.getPath();
-    }
-
     /**
      * Returns the final component of this path, i.e., everything that follows 
the last separator.
      *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java 
b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index f1d941c6f..55d264f1c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -222,7 +222,7 @@ public class LocalFileIO implements FileIO {
      */
     public File toFile(Path path) {
         // remove scheme
-        String localPath = path.getPath();
+        String localPath = path.toUri().getPath();
         checkState(localPath != null, "Cannot convert a null path to File");
 
         if (localPath.length() == 0) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 41660abe6..ec263f90a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -32,16 +32,12 @@ public class CatalogUtils {
         return String.format("%s/%s.db/%s", warehouse, database, table);
     }
 
-    public static String warehouse(Path path) {
-        return path.getParent().getParent().getPath();
-    }
-
     public static String warehouse(String path) {
-        return new Path(path).getParent().getParent().getPath();
+        return new Path(path).getParent().getParent().toString();
     }
 
     public static String database(Path path) {
-        return SchemaManager.fromPath(path.getPath(), false).getDatabaseName();
+        return SchemaManager.fromPath(path.toString(), 
false).getDatabaseName();
     }
 
     public static String database(String path) {
@@ -49,7 +45,7 @@ public class CatalogUtils {
     }
 
     public static String table(Path path) {
-        return SchemaManager.fromPath(path.getPath(), false).getObjectName();
+        return SchemaManager.fromPath(path.toString(), false).getObjectName();
     }
 
     public static String table(String path) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 42f136c1a..01b395a84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -161,7 +161,7 @@ public class SchemaManager implements Serializable {
                     latest().orElseThrow(
                                     () ->
                                             new Catalog.TableNotExistException(
-                                                    
fromPath(tableRoot.getPath(), true)));
+                                                    
fromPath(tableRoot.toString(), true)));
             Map<String, String> newOptions = new HashMap<>(schema.options());
             List<DataField> newFields = new ArrayList<>(schema.fields());
             AtomicInteger highestFieldId = new 
AtomicInteger(schema.highestFieldId());
@@ -179,7 +179,7 @@ public class SchemaManager implements Serializable {
                     SchemaChange.Move move = addColumn.move();
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(addColumn.fieldName()))) {
                         throw new Catalog.ColumnAlreadyExistException(
-                                fromPath(tableRoot.getPath(), true), 
addColumn.fieldName());
+                                fromPath(tableRoot.toString(), true), 
addColumn.fieldName());
                     }
                     Preconditions.checkArgument(
                             addColumn.dataType().isNullable(),
@@ -214,7 +214,7 @@ public class SchemaManager implements Serializable {
                     validateNotPrimaryAndPartitionKey(schema, 
rename.fieldName());
                     if (newFields.stream().anyMatch(f -> 
f.name().equals(rename.newName()))) {
                         throw new Catalog.ColumnAlreadyExistException(
-                                fromPath(tableRoot.getPath(), true), 
rename.fieldName());
+                                fromPath(tableRoot.toString(), true), 
rename.fieldName());
                     }
 
                     updateNestedColumn(
@@ -233,7 +233,7 @@ public class SchemaManager implements Serializable {
                     if (!newFields.removeIf(
                             f -> f.name().equals(((DropColumn) 
change).fieldName()))) {
                         throw new Catalog.ColumnNotExistException(
-                                fromPath(tableRoot.getPath(), true), 
drop.fieldName());
+                                fromPath(tableRoot.toString(), true), 
drop.fieldName());
                     }
                     if (newFields.isEmpty()) {
                         throw new IllegalArgumentException("Cannot drop all 
fields in table");
@@ -424,7 +424,7 @@ public class SchemaManager implements Serializable {
         }
         if (!found) {
             throw new Catalog.ColumnNotExistException(
-                    fromPath(tableRoot.getPath(), true), 
Arrays.toString(updateFieldNames));
+                    fromPath(tableRoot.toString(), true), 
Arrays.toString(updateFieldNames));
         }
     }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 2c24e7135..79672df2c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -60,7 +60,7 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
         String format = "avro";
         KeyValueFileWriterFactory writerFactory = 
createWriterFactory(tempDir.toString(), format);
         Path path = writerFactory.pathFactory(0).newPath();
-        assertThat(path.getPath().endsWith(format)).isTrue();
+        assertThat(path.toString().endsWith(format)).isTrue();
 
         DataFilePathFactory dataFilePathFactory =
                 new DataFilePathFactory(new Path(tempDir.toString()), "dt=1", 
1, format);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index a9d3732de..ec6f959be 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -704,7 +704,8 @@ public abstract class FileStoreTableTestBase {
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
         List<java.nio.file.Path> files =
-                Files.walk(new 
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                        .collect(Collectors.toList());
         assertThat(files.size()).isEqualTo(15);
         // table-path
         // table-path/snapshot
@@ -755,7 +756,8 @@ public abstract class FileStoreTableTestBase {
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
         List<java.nio.file.Path> files =
-                Files.walk(new 
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                        .collect(Collectors.toList());
         assertThat(files.size()).isEqualTo(16);
         // case 0 plus 1:
         // table-path/tag/tag-test3
@@ -795,7 +797,8 @@ public abstract class FileStoreTableTestBase {
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
         List<java.nio.file.Path> files =
-                Files.walk(new 
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                        .collect(Collectors.toList());
         assertThat(files.size()).isEqualTo(23);
         // case 0 plus 7:
         // table-path/manifest/manifest-list-2
@@ -847,7 +850,8 @@ public abstract class FileStoreTableTestBase {
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
         List<java.nio.file.Path> files =
-                Files.walk(new 
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                        .collect(Collectors.toList());
         assertThat(files.size()).isEqualTo(16);
         // rollback snapshot case 0 plus 1:
         // table-path/tag/tag-test1
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index e8115754d..4863053e1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -490,7 +490,8 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
     @Test
     public void testStreamingChangelogCompatibility02() throws Exception {
         // already contains 2 commits
-        CompatibilityTestUtils.unzip("compatibility/table-changelog-0.2.zip", 
tablePath.getPath());
+        CompatibilityTestUtils.unzip(
+                "compatibility/table-changelog-0.2.zip", 
tablePath.toUri().getPath());
         FileStoreTable table =
                 createFileStoreTable(
                         conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER, 
ChangelogProducer.INPUT),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index bdf1c1f80..0a92024fb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -76,7 +76,7 @@ public class SchemaEvolutionTest {
     @BeforeEach
     public void beforeEach() {
         tablePath = new Path(tempDir.toUri());
-        identifier = SchemaManager.fromPath(tablePath.getPath(), true);
+        identifier = SchemaManager.fromPath(tablePath.toString(), true);
         schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);
         commitUser = UUID.randomUUID().toString();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
index 9ce46e77b..048911152 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -276,7 +276,7 @@ public class StartupModeTest extends ScannerTestBase {
     private void initializeTable(
             CoreOptions.StartupMode startupMode, Map<String, String> 
properties) throws Exception {
         Options options = new Options();
-        options.set(PATH, tablePath.getPath());
+        options.set(PATH, tablePath.toString());
         options.set(CoreOptions.SCAN_MODE, startupMode);
         for (Map.Entry<String, String> property : properties.entrySet()) {
             options.set(property.getKey(), property.getValue());
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index f756fa1a7..c20cc4440 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -68,7 +68,7 @@ public class CdcMultiplexRecordChannelComputerTest {
         tableWithoutPartition = Identifier.create(databaseName, "test_table2");
 
         Options catalogOptions = new Options();
-        catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse.getPath());
+        catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse.toString());
         catalogOptions.set(CatalogOptions.URI, "");
         catalogLoader = () -> 
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
         catalog = catalogLoader.load();
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 2036d351c..ce7d76287 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -354,7 +354,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
 
     private Options createCatalogOptions(Path warehouse) {
         Options conf = new Options();
-        conf.set(CatalogOptions.WAREHOUSE, warehouse.getPath());
+        conf.set(CatalogOptions.WAREHOUSE, warehouse.toString());
         conf.set(CatalogOptions.URI, "");
 
         return conf;
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 2833524c9..9d0a15fe2 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -674,7 +674,7 @@ class StoreMultiCommitterTest {
 
     private Options createCatalogOptions(Path warehouse) {
         Options conf = new Options();
-        conf.set(CatalogOptions.WAREHOUSE, warehouse.getPath());
+        conf.set(CatalogOptions.WAREHOUSE, warehouse.toString());
         conf.set(CatalogOptions.URI, "");
 
         return conf;
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
index a247ec75f..305a9f18b 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
@@ -28,7 +28,6 @@ import org.apache.paimon.table.sink.CommitMessage;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.OutputCommitter;
@@ -96,7 +95,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
                 createPreCommitFile(
                         commitTables,
                         generatePreCommitFileLocation(
-                                table.location().getPath(),
+                                table.location(),
                                 attemptID.getJobID(),
                                 attemptID.getTaskID().getId()),
                         table.fileIO());
@@ -141,7 +140,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
         if (table != null) {
             BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
             List<CommitMessage> commitMessagesList =
-                    getAllPreCommitMessage(table.location().getPath(), 
jobContext, table.fileIO());
+                    getAllPreCommitMessage(table.location(), jobContext, 
table.fileIO());
             try (BatchTableCommit batchTableCommit = 
batchWriteBuilder.newCommit()) {
                 batchTableCommit.commit(commitMessagesList);
             } catch (Exception e) {
@@ -149,7 +148,8 @@ public class PaimonOutputCommitter extends OutputCommitter {
             }
             deleteTemporaryFile(
                     jobContext,
-                    generateJobLocation(table.location().getPath(), 
jobContext.getJobID()));
+                    generateJobLocation(table.location(), 
jobContext.getJobID()),
+                    table.fileIO());
         } else {
             LOG.info("CommitJob not found table, Skipping job commit.");
         }
@@ -167,7 +167,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
 
             LOG.info("AbortJob {} has started", jobContext.getJobID());
             List<CommitMessage> commitMessagesList =
-                    getAllPreCommitMessage(table.location().getPath(), 
jobContext, table.fileIO());
+                    getAllPreCommitMessage(table.location(), jobContext, 
table.fileIO());
             BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
             try (BatchTableCommit batchTableCommit = 
batchWriteBuilder.newCommit()) {
                 batchTableCommit.abort(commitMessagesList);
@@ -176,7 +176,8 @@ public class PaimonOutputCommitter extends OutputCommitter {
             }
             deleteTemporaryFile(
                     jobContext,
-                    generateJobLocation(table.location().getPath(), 
jobContext.getJobID()));
+                    generateJobLocation(table.location(), 
jobContext.getJobID()),
+                    table.fileIO());
             LOG.info("Job {} is aborted. preCommit file has deleted", 
jobContext.getJobID());
         }
     }
@@ -186,18 +187,13 @@ public class PaimonOutputCommitter extends 
OutputCommitter {
      *
      * @param jobContext The job context
      * @param location The locations to clean up
-     * @throws IOException if there is a failure deleting the files
      */
-    private void deleteTemporaryFile(JobContext jobContext, String location) 
throws IOException {
-        JobConf jobConf = jobContext.getJobConf();
-
+    private void deleteTemporaryFile(JobContext jobContext, Path location, 
FileIO fileIO) {
         LOG.info("Deleting temporary file for job {} started", 
jobContext.getJobID());
 
         LOG.info("The deleted file is located in : {}", location);
         try {
-            org.apache.hadoop.fs.Path deleteFilePath = new 
org.apache.hadoop.fs.Path(location);
-            FileSystem fs = deleteFilePath.getFileSystem(jobConf);
-            fs.delete(deleteFilePath, true);
+            fileIO.delete(location, true);
         } catch (IOException e) {
             LOG.debug("Failed to delete directory {} ", location, e);
         }
@@ -213,7 +209,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
      * @return The list of the committed data files
      */
     private static List<CommitMessage> getAllPreCommitMessage(
-            String location, JobContext jobContext, FileIO io) {
+            Path location, JobContext jobContext, FileIO io) {
         JobConf conf = jobContext.getJobConf();
 
         int totalCommitMessagesSize =
@@ -222,7 +218,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
         List<CommitMessage> commitMessagesList = 
Collections.synchronizedList(new ArrayList<>());
 
         for (int i = 0; i < totalCommitMessagesSize; i++) {
-            String commitFileLocation =
+            Path commitFileLocation =
                     generatePreCommitFileLocation(location, 
jobContext.getJobID(), i);
             commitMessagesList.addAll(readPreCommitFile(commitFileLocation, 
io));
         }
@@ -237,8 +233,8 @@ public class PaimonOutputCommitter extends OutputCommitter {
      * @param jobId The JobID for the task
      * @return The file to store the results
      */
-    static String generateJobLocation(String location, JobID jobId) {
-        return location + "/temp/" + jobId;
+    static Path generateJobLocation(Path location, JobID jobId) {
+        return new Path(new Path(location, "temp"), jobId.toString());
     }
 
     /**
@@ -252,8 +248,8 @@ public class PaimonOutputCommitter extends OutputCommitter {
      * @param taskId taskId
      * @return The location of preCommit file path
      */
-    private static String generatePreCommitFileLocation(String location, JobID 
jobId, int taskId) {
-        return generateJobLocation(location, jobId) + "/task_" + taskId + 
PRE_COMMIT;
+    private static Path generatePreCommitFileLocation(Path location, JobID 
jobId, int taskId) {
+        return new Path(generateJobLocation(location, jobId), "task_" + taskId 
+ PRE_COMMIT);
     }
 
     /**
@@ -262,16 +258,16 @@ public class PaimonOutputCommitter extends 
OutputCommitter {
      * file's location @Param io The FileIO of the table.
      */
     private static void createPreCommitFile(
-            List<CommitMessage> commitTables, String location, FileIO io) 
throws IOException {
+            List<CommitMessage> commitTables, Path location, FileIO io) throws 
IOException {
         try (ObjectOutputStream objectOutputStream =
-                new ObjectOutputStream(io.newOutputStream(new Path(location), 
true))) {
+                new ObjectOutputStream(io.newOutputStream(location, true))) {
             objectOutputStream.writeObject(commitTables);
         }
     }
 
-    private static List<CommitMessage> readPreCommitFile(String location, 
FileIO io) {
+    private static List<CommitMessage> readPreCommitFile(Path location, FileIO 
io) {
         try (ObjectInputStream objectInputStream =
-                new ObjectInputStream(io.newInputStream(new Path(location)))) {
+                new ObjectInputStream(io.newInputStream(location))) {
             return (List<CommitMessage>) objectInputStream.readObject();
         } catch (ClassNotFoundException | IOException e) {
             throw new RuntimeException(
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 36df428a5..f42fa2d37 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -37,7 +37,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData
@@ -81,7 +81,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData
@@ -122,7 +122,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (city String, population Long)
                        |TBLPROPERTIES ('write-mode'='append-only', 
'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData.toDS
@@ -166,7 +166,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           intercept[RuntimeException] {
@@ -190,7 +190,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                      |CREATE TABLE T (start Timestamp, stockId INT, avg_price 
DOUBLE)
                      |TBLPROPERTIES ('write-mode'='append-only', 'bucket'='3')
                      |""".stripMargin)
-        val location = loadTable("T").location().getPath
+        val location = loadTable("T").location().toString
 
         val inputData = MemoryStream[(Long, Int, Double)]
         val data = inputData.toDS
@@ -241,7 +241,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val date = Date.valueOf("2023-08-10")
           spark.sql("INSERT INTO T VALUES (1, '2023-08-09'), (2, 
'2023-08-09')")
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
index 44b76c8ad..30e112ab5 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
@@ -36,7 +36,7 @@ class CreateAndDeleteTagProcedureTest extends 
PaimonSparkTestBase with StreamTes
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
index 647f29853..d110e68bf 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
@@ -36,7 +36,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 
'write-mode'='change-log', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 5cae15cd2..7239efad3 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -137,7 +137,7 @@ public class CompactProcedure extends BaseProcedure {
             }
         }
 
-        Dataset<Row> row = 
spark().read().format("paimon").load(coreOptions.path().getPath());
+        Dataset<Row> row = 
spark().read().format("paimon").load(coreOptions.path().toString());
         row = StringUtils.isBlank(filter) ? row : row.where(filter);
         new WriteIntoPaimonTable(
                         table,
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
index 9c1de89c4..8782aebeb 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
@@ -43,7 +43,7 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with 
StreamTest {
         spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
 
         val table = loadTable(tableName)
-        val location = table.location().getPath
+        val location = table.location().toString
 
         val readStream = spark.readStream
           .format("paimon")
@@ -94,7 +94,7 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with 
StreamTest {
         spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
 
         val table = loadTable(tableName)
-        val location = table.location().getPath
+        val location = table.location().toString
 
         val readStream = spark.readStream
           .format("paimon")
@@ -146,7 +146,7 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with 
StreamTest {
                      |""".stripMargin)
 
         val table = loadTable(tableName)
-        val location = table.location().getPath
+        val location = table.location().toString
 
         // streaming write
         val inputData = MemoryStream[(Int, String)]
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 75a6738d0..8e0eb6b7e 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -37,7 +37,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData
@@ -81,7 +81,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData
@@ -122,7 +122,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (city String, population Long)
                        |TBLPROPERTIES ('bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData.toDS
@@ -166,7 +166,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           intercept[RuntimeException] {
@@ -190,7 +190,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                      |CREATE TABLE T (start Timestamp, stockId INT, avg_price 
DOUBLE)
                      |TBLPROPERTIES ('bucket'='3')
                      |""".stripMargin)
-        val location = loadTable("T").location().getPath
+        val location = loadTable("T").location().toString
 
         val inputData = MemoryStream[(Long, Int, Double)]
         val data = inputData.toDS
@@ -241,7 +241,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val date = Date.valueOf("2023-08-10")
           spark.sql("INSERT INTO T VALUES (1, '2023-08-09'), (2, 
'2023-08-09')")
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
index 60aa7e1fd..f9f5038d4 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -731,7 +731,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with 
StreamTest {
                  |TBLPROPERTIES ($primaryKeysProp 'bucket'='2', 
'file.format'='parquet')
                  |""".stripMargin)
     val table = loadTable(tableName)
-    val location = table.location().getPath
+    val location = table.location().toString
 
     val mergedData = scala.collection.mutable.TreeMap.empty[Int, String]
     val unmergedData = scala.collection.mutable.ArrayBuffer.empty[(Int, 
String)]
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
index 0e56be021..5e826237b 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
@@ -40,7 +40,7 @@ class CompactProcedureTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b INT)
                        |TBLPROPERTIES ('bucket'='-1')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, Int)]
           val stream = inputData
@@ -117,7 +117,7 @@ class CompactProcedureTest extends PaimonSparkTestBase with 
StreamTest {
                        |TBLPROPERTIES ('bucket'='-1')
                        |PARTITIONED BY (p)
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, Int, Int)]
           val stream = inputData
@@ -213,7 +213,7 @@ class CompactProcedureTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b INT)
                        |TBLPROPERTIES ('primary-key'='a,b', 'bucket'='1')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, Int)]
           val stream = inputData
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index 463173ced..f9f559273 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -36,7 +36,7 @@ class CreateAndDeleteTagProcedureTest extends 
PaimonSparkTestBase with StreamTes
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
index 2b77058a0..71075f201 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
@@ -36,7 +36,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with 
StreamTest {
                        |CREATE TABLE T (a INT, b STRING)
                        |TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
                        |""".stripMargin)
-          val location = loadTable("T").location().getPath
+          val location = loadTable("T").location().toString
 
           val inputData = MemoryStream[(Int, String)]
           val stream = inputData
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index c6613b991..5e7493629 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -46,7 +46,7 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
                          |""".stripMargin)
 
             val paimonTable = loadTable("T")
-            val location = paimonTable.location().getPath
+            val location = paimonTable.location().toString
 
             val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b")
             df1.write.format("paimon").mode("append").save(location)
@@ -92,7 +92,7 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
                          |""".stripMargin)
 
             val paimonTable = loadTable("T")
-            val location = paimonTable.location().getPath
+            val location = paimonTable.location().toString
 
             val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b")
             df1.write.format("paimon").mode("append").save(location)
@@ -181,7 +181,7 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
                          |""".stripMargin)
 
             val paimonTable = loadTable("T")
-            val location = paimonTable.location().getPath
+            val location = paimonTable.location().toString
 
             val df1 = Seq((1, "2023-08-01"), (2, "2023-08-02")).toDF("a", "b")
             df1.write.format("paimon").mode("append").save(location)


Reply via email to