This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f76d0dde4 [core] Remove Path.getPath (#2379)
f76d0dde4 is described below
commit f76d0dde4c947fd55e40d8536db494534b441b8d
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Nov 23 17:06:43 2023 +0800
[core] Remove Path.getPath (#2379)
---
.../src/main/java/org/apache/paimon/fs/Path.java | 9 -----
.../org/apache/paimon/fs/local/LocalFileIO.java | 2 +-
.../org/apache/paimon/catalog/CatalogUtils.java | 10 ++----
.../org/apache/paimon/schema/SchemaManager.java | 10 +++---
.../apache/paimon/format/FileFormatSuffixTest.java | 2 +-
.../paimon/table/FileStoreTableTestBase.java | 12 ++++---
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 3 +-
.../apache/paimon/table/SchemaEvolutionTest.java | 2 +-
.../paimon/table/source/StartupModeTest.java | 2 +-
.../cdc/CdcMultiplexRecordChannelComputerTest.java | 2 +-
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 2 +-
.../paimon/flink/sink/StoreMultiCommitterTest.java | 2 +-
.../paimon/hive/mapred/PaimonOutputCommitter.java | 42 ++++++++++------------
.../org/apache/paimon/spark/PaimonSinkTest.scala | 12 +++----
.../sql/CreateAndDeleteTagProcedureTest.scala | 2 +-
.../paimon/spark/sql/RollbackProcedureTest.scala | 2 +-
.../paimon/spark/procedure/CompactProcedure.java | 2 +-
.../apache/paimon/spark/PaimonCDCSourceTest.scala | 6 ++--
.../org/apache/paimon/spark/PaimonSinkTest.scala | 12 +++----
.../org/apache/paimon/spark/PaimonSourceTest.scala | 2 +-
.../spark/procedure/CompactProcedureTest.scala | 6 ++--
.../CreateAndDeleteTagProcedureTest.scala | 2 +-
.../spark/procedure/RollbackProcedureTest.scala | 2 +-
.../paimon/spark/sql/DataFrameWriteTest.scala | 6 ++--
24 files changed, 71 insertions(+), 83 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
index 87a9ce7b0..02a46be77 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/Path.java
@@ -299,15 +299,6 @@ public class Path implements Comparable<Path>,
Serializable {
return uri;
}
- /**
- * Return full path.
- *
- * @return full path
- */
- public String getPath() {
- return uri.getPath();
- }
-
/**
* Returns the final component of this path, i.e., everything that follows
the last separator.
*
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index f1d941c6f..55d264f1c 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -222,7 +222,7 @@ public class LocalFileIO implements FileIO {
*/
public File toFile(Path path) {
// remove scheme
- String localPath = path.getPath();
+ String localPath = path.toUri().getPath();
checkState(localPath != null, "Cannot convert a null path to File");
if (localPath.length() == 0) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 41660abe6..ec263f90a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -32,16 +32,12 @@ public class CatalogUtils {
return String.format("%s/%s.db/%s", warehouse, database, table);
}
- public static String warehouse(Path path) {
- return path.getParent().getParent().getPath();
- }
-
public static String warehouse(String path) {
- return new Path(path).getParent().getParent().getPath();
+ return new Path(path).getParent().getParent().toString();
}
public static String database(Path path) {
- return SchemaManager.fromPath(path.getPath(), false).getDatabaseName();
+ return SchemaManager.fromPath(path.toString(),
false).getDatabaseName();
}
public static String database(String path) {
@@ -49,7 +45,7 @@ public class CatalogUtils {
}
public static String table(Path path) {
- return SchemaManager.fromPath(path.getPath(), false).getObjectName();
+ return SchemaManager.fromPath(path.toString(), false).getObjectName();
}
public static String table(String path) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 42f136c1a..01b395a84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -161,7 +161,7 @@ public class SchemaManager implements Serializable {
latest().orElseThrow(
() ->
new Catalog.TableNotExistException(
-
fromPath(tableRoot.getPath(), true)));
+
fromPath(tableRoot.toString(), true)));
Map<String, String> newOptions = new HashMap<>(schema.options());
List<DataField> newFields = new ArrayList<>(schema.fields());
AtomicInteger highestFieldId = new
AtomicInteger(schema.highestFieldId());
@@ -179,7 +179,7 @@ public class SchemaManager implements Serializable {
SchemaChange.Move move = addColumn.move();
if (newFields.stream().anyMatch(f ->
f.name().equals(addColumn.fieldName()))) {
throw new Catalog.ColumnAlreadyExistException(
- fromPath(tableRoot.getPath(), true),
addColumn.fieldName());
+ fromPath(tableRoot.toString(), true),
addColumn.fieldName());
}
Preconditions.checkArgument(
addColumn.dataType().isNullable(),
@@ -214,7 +214,7 @@ public class SchemaManager implements Serializable {
validateNotPrimaryAndPartitionKey(schema,
rename.fieldName());
if (newFields.stream().anyMatch(f ->
f.name().equals(rename.newName()))) {
throw new Catalog.ColumnAlreadyExistException(
- fromPath(tableRoot.getPath(), true),
rename.fieldName());
+ fromPath(tableRoot.toString(), true),
rename.fieldName());
}
updateNestedColumn(
@@ -233,7 +233,7 @@ public class SchemaManager implements Serializable {
if (!newFields.removeIf(
f -> f.name().equals(((DropColumn)
change).fieldName()))) {
throw new Catalog.ColumnNotExistException(
- fromPath(tableRoot.getPath(), true),
drop.fieldName());
+ fromPath(tableRoot.toString(), true),
drop.fieldName());
}
if (newFields.isEmpty()) {
throw new IllegalArgumentException("Cannot drop all
fields in table");
@@ -424,7 +424,7 @@ public class SchemaManager implements Serializable {
}
if (!found) {
throw new Catalog.ColumnNotExistException(
- fromPath(tableRoot.getPath(), true),
Arrays.toString(updateFieldNames));
+ fromPath(tableRoot.toString(), true),
Arrays.toString(updateFieldNames));
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 2c24e7135..79672df2c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -60,7 +60,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
String format = "avro";
KeyValueFileWriterFactory writerFactory =
createWriterFactory(tempDir.toString(), format);
Path path = writerFactory.pathFactory(0).newPath();
- assertThat(path.getPath().endsWith(format)).isTrue();
+ assertThat(path.toString().endsWith(format)).isTrue();
DataFilePathFactory dataFilePathFactory =
new DataFilePathFactory(new Path(tempDir.toString()), "dt=1",
1, format);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index a9d3732de..ec6f959be 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -704,7 +704,8 @@ public abstract class FileStoreTableTestBase {
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
List<java.nio.file.Path> files =
- Files.walk(new
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+ Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ .collect(Collectors.toList());
assertThat(files.size()).isEqualTo(15);
// table-path
// table-path/snapshot
@@ -755,7 +756,8 @@ public abstract class FileStoreTableTestBase {
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
List<java.nio.file.Path> files =
- Files.walk(new
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+ Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ .collect(Collectors.toList());
assertThat(files.size()).isEqualTo(16);
// case 0 plus 1:
// table-path/tag/tag-test3
@@ -795,7 +797,8 @@ public abstract class FileStoreTableTestBase {
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
List<java.nio.file.Path> files =
- Files.walk(new
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+ Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ .collect(Collectors.toList());
assertThat(files.size()).isEqualTo(23);
// case 0 plus 7:
// table-path/manifest/manifest-list-2
@@ -847,7 +850,8 @@ public abstract class FileStoreTableTestBase {
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
List<java.nio.file.Path> files =
- Files.walk(new
File(tablePath.getPath()).toPath()).collect(Collectors.toList());
+ Files.walk(new File(tablePath.toUri().getPath()).toPath())
+ .collect(Collectors.toList());
assertThat(files.size()).isEqualTo(16);
// rollback snapshot case 0 plus 1:
// table-path/tag/tag-test1
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index e8115754d..4863053e1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -490,7 +490,8 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
@Test
public void testStreamingChangelogCompatibility02() throws Exception {
// already contains 2 commits
- CompatibilityTestUtils.unzip("compatibility/table-changelog-0.2.zip",
tablePath.getPath());
+ CompatibilityTestUtils.unzip(
+ "compatibility/table-changelog-0.2.zip",
tablePath.toUri().getPath());
FileStoreTable table =
createFileStoreTable(
conf -> conf.set(CoreOptions.CHANGELOG_PRODUCER,
ChangelogProducer.INPUT),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index bdf1c1f80..0a92024fb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -76,7 +76,7 @@ public class SchemaEvolutionTest {
@BeforeEach
public void beforeEach() {
tablePath = new Path(tempDir.toUri());
- identifier = SchemaManager.fromPath(tablePath.getPath(), true);
+ identifier = SchemaManager.fromPath(tablePath.toString(), true);
schemaManager = new SchemaManager(LocalFileIO.create(), tablePath);
commitUser = UUID.randomUUID().toString();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
index 9ce46e77b..048911152 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/StartupModeTest.java
@@ -276,7 +276,7 @@ public class StartupModeTest extends ScannerTestBase {
private void initializeTable(
CoreOptions.StartupMode startupMode, Map<String, String>
properties) throws Exception {
Options options = new Options();
- options.set(PATH, tablePath.getPath());
+ options.set(PATH, tablePath.toString());
options.set(CoreOptions.SCAN_MODE, startupMode);
for (Map.Entry<String, String> property : properties.entrySet()) {
options.set(property.getKey(), property.getValue());
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
index f756fa1a7..c20cc4440 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcMultiplexRecordChannelComputerTest.java
@@ -68,7 +68,7 @@ public class CdcMultiplexRecordChannelComputerTest {
tableWithoutPartition = Identifier.create(databaseName, "test_table2");
Options catalogOptions = new Options();
- catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse.getPath());
+ catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse.toString());
catalogOptions.set(CatalogOptions.URI, "");
catalogLoader = () ->
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
catalog = catalogLoader.load();
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 2036d351c..ce7d76287 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -354,7 +354,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
private Options createCatalogOptions(Path warehouse) {
Options conf = new Options();
- conf.set(CatalogOptions.WAREHOUSE, warehouse.getPath());
+ conf.set(CatalogOptions.WAREHOUSE, warehouse.toString());
conf.set(CatalogOptions.URI, "");
return conf;
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 2833524c9..9d0a15fe2 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -674,7 +674,7 @@ class StoreMultiCommitterTest {
private Options createCatalogOptions(Path warehouse) {
Options conf = new Options();
- conf.set(CatalogOptions.WAREHOUSE, warehouse.getPath());
+ conf.set(CatalogOptions.WAREHOUSE, warehouse.toString());
conf.set(CatalogOptions.URI, "");
return conf;
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
index a247ec75f..305a9f18b 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/mapred/PaimonOutputCommitter.java
@@ -28,7 +28,6 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobContext;
import org.apache.hadoop.mapred.OutputCommitter;
@@ -96,7 +95,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
createPreCommitFile(
commitTables,
generatePreCommitFileLocation(
- table.location().getPath(),
+ table.location(),
attemptID.getJobID(),
attemptID.getTaskID().getId()),
table.fileIO());
@@ -141,7 +140,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
if (table != null) {
BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
List<CommitMessage> commitMessagesList =
- getAllPreCommitMessage(table.location().getPath(),
jobContext, table.fileIO());
+ getAllPreCommitMessage(table.location(), jobContext,
table.fileIO());
try (BatchTableCommit batchTableCommit =
batchWriteBuilder.newCommit()) {
batchTableCommit.commit(commitMessagesList);
} catch (Exception e) {
@@ -149,7 +148,8 @@ public class PaimonOutputCommitter extends OutputCommitter {
}
deleteTemporaryFile(
jobContext,
- generateJobLocation(table.location().getPath(),
jobContext.getJobID()));
+ generateJobLocation(table.location(),
jobContext.getJobID()),
+ table.fileIO());
} else {
LOG.info("CommitJob not found table, Skipping job commit.");
}
@@ -167,7 +167,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
LOG.info("AbortJob {} has started", jobContext.getJobID());
List<CommitMessage> commitMessagesList =
- getAllPreCommitMessage(table.location().getPath(),
jobContext, table.fileIO());
+ getAllPreCommitMessage(table.location(), jobContext,
table.fileIO());
BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder();
try (BatchTableCommit batchTableCommit =
batchWriteBuilder.newCommit()) {
batchTableCommit.abort(commitMessagesList);
@@ -176,7 +176,8 @@ public class PaimonOutputCommitter extends OutputCommitter {
}
deleteTemporaryFile(
jobContext,
- generateJobLocation(table.location().getPath(),
jobContext.getJobID()));
+ generateJobLocation(table.location(),
jobContext.getJobID()),
+ table.fileIO());
LOG.info("Job {} is aborted. preCommit file has deleted",
jobContext.getJobID());
}
}
@@ -186,18 +187,13 @@ public class PaimonOutputCommitter extends
OutputCommitter {
*
* @param jobContext The job context
* @param location The locations to clean up
- * @throws IOException if there is a failure deleting the files
*/
- private void deleteTemporaryFile(JobContext jobContext, String location)
throws IOException {
- JobConf jobConf = jobContext.getJobConf();
-
+ private void deleteTemporaryFile(JobContext jobContext, Path location,
FileIO fileIO) {
LOG.info("Deleting temporary file for job {} started",
jobContext.getJobID());
LOG.info("The deleted file is located in : {}", location);
try {
- org.apache.hadoop.fs.Path deleteFilePath = new
org.apache.hadoop.fs.Path(location);
- FileSystem fs = deleteFilePath.getFileSystem(jobConf);
- fs.delete(deleteFilePath, true);
+ fileIO.delete(location, true);
} catch (IOException e) {
LOG.debug("Failed to delete directory {} ", location, e);
}
@@ -213,7 +209,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
* @return The list of the committed data files
*/
private static List<CommitMessage> getAllPreCommitMessage(
- String location, JobContext jobContext, FileIO io) {
+ Path location, JobContext jobContext, FileIO io) {
JobConf conf = jobContext.getJobConf();
int totalCommitMessagesSize =
@@ -222,7 +218,7 @@ public class PaimonOutputCommitter extends OutputCommitter {
List<CommitMessage> commitMessagesList =
Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < totalCommitMessagesSize; i++) {
- String commitFileLocation =
+ Path commitFileLocation =
generatePreCommitFileLocation(location,
jobContext.getJobID(), i);
commitMessagesList.addAll(readPreCommitFile(commitFileLocation,
io));
}
@@ -237,8 +233,8 @@ public class PaimonOutputCommitter extends OutputCommitter {
* @param jobId The JobID for the task
* @return The file to store the results
*/
- static String generateJobLocation(String location, JobID jobId) {
- return location + "/temp/" + jobId;
+ static Path generateJobLocation(Path location, JobID jobId) {
+ return new Path(new Path(location, "temp"), jobId.toString());
}
/**
@@ -252,8 +248,8 @@ public class PaimonOutputCommitter extends OutputCommitter {
* @param taskId taskId
* @return The location of preCommit file path
*/
- private static String generatePreCommitFileLocation(String location, JobID
jobId, int taskId) {
- return generateJobLocation(location, jobId) + "/task_" + taskId +
PRE_COMMIT;
+ private static Path generatePreCommitFileLocation(Path location, JobID
jobId, int taskId) {
+ return new Path(generateJobLocation(location, jobId), "task_" + taskId
+ PRE_COMMIT);
}
/**
@@ -262,16 +258,16 @@ public class PaimonOutputCommitter extends
OutputCommitter {
* file's location @Param io The FileIO of the table.
*/
private static void createPreCommitFile(
- List<CommitMessage> commitTables, String location, FileIO io)
throws IOException {
+ List<CommitMessage> commitTables, Path location, FileIO io) throws
IOException {
try (ObjectOutputStream objectOutputStream =
- new ObjectOutputStream(io.newOutputStream(new Path(location),
true))) {
+ new ObjectOutputStream(io.newOutputStream(location, true))) {
objectOutputStream.writeObject(commitTables);
}
}
- private static List<CommitMessage> readPreCommitFile(String location,
FileIO io) {
+ private static List<CommitMessage> readPreCommitFile(Path location, FileIO
io) {
try (ObjectInputStream objectInputStream =
- new ObjectInputStream(io.newInputStream(new Path(location)))) {
+ new ObjectInputStream(io.newInputStream(location))) {
return (List<CommitMessage>) objectInputStream.readObject();
} catch (ClassNotFoundException | IOException e) {
throw new RuntimeException(
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 36df428a5..f42fa2d37 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -37,7 +37,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData
@@ -81,7 +81,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData
@@ -122,7 +122,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (city String, population Long)
|TBLPROPERTIES ('write-mode'='append-only',
'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData.toDS
@@ -166,7 +166,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
intercept[RuntimeException] {
@@ -190,7 +190,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (start Timestamp, stockId INT, avg_price
DOUBLE)
|TBLPROPERTIES ('write-mode'='append-only', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Long, Int, Double)]
val data = inputData.toDS
@@ -241,7 +241,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val date = Date.valueOf("2023-08-10")
spark.sql("INSERT INTO T VALUES (1, '2023-08-09'), (2,
'2023-08-09')")
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
index 44b76c8ad..30e112ab5 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/CreateAndDeleteTagProcedureTest.scala
@@ -36,7 +36,7 @@ class CreateAndDeleteTagProcedureTest extends
PaimonSparkTestBase with StreamTes
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
index 647f29853..d110e68bf 100644
---
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/RollbackProcedureTest.scala
@@ -36,7 +36,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a',
'write-mode'='change-log', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 5cae15cd2..7239efad3 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -137,7 +137,7 @@ public class CompactProcedure extends BaseProcedure {
}
}
- Dataset<Row> row =
spark().read().format("paimon").load(coreOptions.path().getPath());
+ Dataset<Row> row =
spark().read().format("paimon").load(coreOptions.path().toString());
row = StringUtils.isBlank(filter) ? row : row.where(filter);
new WriteIntoPaimonTable(
table,
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
index 9c1de89c4..8782aebeb 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonCDCSourceTest.scala
@@ -43,7 +43,7 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with
StreamTest {
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
val table = loadTable(tableName)
- val location = table.location().getPath
+ val location = table.location().toString
val readStream = spark.readStream
.format("paimon")
@@ -94,7 +94,7 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with
StreamTest {
spark.sql(s"INSERT INTO $tableName VALUES (2, 'v_2_new')")
val table = loadTable(tableName)
- val location = table.location().getPath
+ val location = table.location().toString
val readStream = spark.readStream
.format("paimon")
@@ -146,7 +146,7 @@ class PaimonCDCSourceTest extends PaimonSparkTestBase with
StreamTest {
|""".stripMargin)
val table = loadTable(tableName)
- val location = table.location().getPath
+ val location = table.location().toString
// streaming write
val inputData = MemoryStream[(Int, String)]
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
index 75a6738d0..8e0eb6b7e 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSinkTest.scala
@@ -37,7 +37,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData
@@ -81,7 +81,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData
@@ -122,7 +122,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (city String, population Long)
|TBLPROPERTIES ('bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData.toDS
@@ -166,7 +166,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
intercept[RuntimeException] {
@@ -190,7 +190,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (start Timestamp, stockId INT, avg_price
DOUBLE)
|TBLPROPERTIES ('bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Long, Int, Double)]
val data = inputData.toDS
@@ -241,7 +241,7 @@ class PaimonSinkTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val date = Date.valueOf("2023-08-10")
spark.sql("INSERT INTO T VALUES (1, '2023-08-09'), (2,
'2023-08-09')")
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
index 60aa7e1fd..f9f5038d4 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/PaimonSourceTest.scala
@@ -731,7 +731,7 @@ class PaimonSourceTest extends PaimonSparkTestBase with
StreamTest {
|TBLPROPERTIES ($primaryKeysProp 'bucket'='2',
'file.format'='parquet')
|""".stripMargin)
val table = loadTable(tableName)
- val location = table.location().getPath
+ val location = table.location().toString
val mergedData = scala.collection.mutable.TreeMap.empty[Int, String]
val unmergedData = scala.collection.mutable.ArrayBuffer.empty[(Int,
String)]
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
index 0e56be021..5e826237b 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTest.scala
@@ -40,7 +40,7 @@ class CompactProcedureTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b INT)
|TBLPROPERTIES ('bucket'='-1')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, Int)]
val stream = inputData
@@ -117,7 +117,7 @@ class CompactProcedureTest extends PaimonSparkTestBase with
StreamTest {
|TBLPROPERTIES ('bucket'='-1')
|PARTITIONED BY (p)
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, Int, Int)]
val stream = inputData
@@ -213,7 +213,7 @@ class CompactProcedureTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b INT)
|TBLPROPERTIES ('primary-key'='a,b', 'bucket'='1')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, Int)]
val stream = inputData
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
index 463173ced..f9f559273 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/CreateAndDeleteTagProcedureTest.scala
@@ -36,7 +36,7 @@ class CreateAndDeleteTagProcedureTest extends
PaimonSparkTestBase with StreamTes
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
index 2b77058a0..71075f201 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala
@@ -36,7 +36,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with
StreamTest {
|CREATE TABLE T (a INT, b STRING)
|TBLPROPERTIES ('primary-key'='a', 'bucket'='3')
|""".stripMargin)
- val location = loadTable("T").location().getPath
+ val location = loadTable("T").location().toString
val inputData = MemoryStream[(Int, String)]
val stream = inputData
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
index c6613b991..5e7493629 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DataFrameWriteTest.scala
@@ -46,7 +46,7 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
|""".stripMargin)
val paimonTable = loadTable("T")
- val location = paimonTable.location().getPath
+ val location = paimonTable.location().toString
val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b")
df1.write.format("paimon").mode("append").save(location)
@@ -92,7 +92,7 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
|""".stripMargin)
val paimonTable = loadTable("T")
- val location = paimonTable.location().getPath
+ val location = paimonTable.location().toString
val df1 = Seq((1, "a"), (2, "b")).toDF("a", "b")
df1.write.format("paimon").mode("append").save(location)
@@ -181,7 +181,7 @@ class DataFrameWriteTest extends PaimonSparkTestBase {
|""".stripMargin)
val paimonTable = loadTable("T")
- val location = paimonTable.location().getPath
+ val location = paimonTable.location().toString
val df1 = Seq((1, "2023-08-01"), (2, "2023-08-02")).toDF("a", "b")
df1.write.format("paimon").mode("append").save(location)