This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b249c84272 [core] Do not use path.toUri().toString() when a location 
string is expected internally (#7098)
b249c84272 is described below

commit b249c842723e5131484cd319d7a142629638e19d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jan 22 11:19:12 2026 +0800

    [core] Do not use path.toUri().toString() when a location string is 
expected internally (#7098)
---
 .../test/java/org/apache/paimon/fs/PathTest.java   |  46 +++++++
 .../org/apache/paimon/catalog/CatalogContext.java  |   2 +-
 .../org/apache/paimon/catalog/CatalogFactory.java  |   2 +-
 .../org/apache/paimon/io/DataFilePathFactory.java  |   2 +-
 .../org/apache/paimon/io/PojoDataFileMeta.java     |   4 +-
 .../paimon/operation/LocalOrphanFilesClean.java    |   2 +-
 .../paimon/table/AbstractFileStoreTable.java       |   3 +-
 .../apache/paimon/flink/copy/CopyFilesUtil.java    |   4 +-
 .../paimon/flink/orphan/FlinkOrphanFilesClean.java |   2 +-
 .../java/org/apache/paimon/hive/HiveCatalog.java   |   4 +-
 .../org/apache/paimon/hive/PaimonMetaHook.java     |   4 +-
 .../paimon/hive/utils/HiveSplitGenerator.java      |   2 +-
 .../apache/paimon/spark/copy/CopyFilesUtil.java    |   4 +-
 .../paimon/spark/PaimonRecordReaderIterator.scala  |   4 +-
 .../spark/commands/PaimonRowLevelCommand.scala     |  11 +-
 .../spark/procedure/SparkOrphanFilesClean.scala    |   4 +-
 .../apache/paimon/spark/sql/PaimonQueryTest.scala  |   6 +-
 .../spark/sql/SpecialCharacterPathTest.scala       | 145 +++++++++++++++++++++
 18 files changed, 220 insertions(+), 31 deletions(-)

diff --git a/paimon-api/src/test/java/org/apache/paimon/fs/PathTest.java 
b/paimon-api/src/test/java/org/apache/paimon/fs/PathTest.java
new file mode 100644
index 0000000000..ae3e84b1f2
--- /dev/null
+++ b/paimon-api/src/test/java/org/apache/paimon/fs/PathTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Path}. */
+public class PathTest {
+
+    @Test
+    public void testPathWithSpecialCharacters() {
+        String location = "file:/tmp/path/test file";
+        String encodedLocation = "file:/tmp/path/test%20file";
+        Path p1 = new Path(location);
+        assertThat(p1.toString()).isEqualTo(location);
+        assertThat(p1.toUri().toString()).isEqualTo(encodedLocation);
+
+        // Using p1.toString() to create a new Path is correct
+        Path p2 = new Path(p1.toString());
+        assertThat(p2.toString()).isEqualTo(location);
+        assertThat(p2.toUri().toString()).isEqualTo(encodedLocation);
+
+        // Using p1.toUri().toString to create a new Path will encode again
+        Path p3 = new Path(p1.toUri().toString());
+        assertThat(p3.toString()).isEqualTo(encodedLocation);
+        
assertThat(p3.toUri().toString()).isEqualTo("file:/tmp/path/test%2520file");
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java 
b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java
index 9e9cd2fd85..9eb8a55389 100644
--- a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java
+++ b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java
@@ -64,7 +64,7 @@ public class CatalogContext implements Serializable {
 
     public static CatalogContext create(Path warehouse) {
         Options options = new Options();
-        options.set(WAREHOUSE, warehouse.toUri().toString());
+        options.set(WAREHOUSE, warehouse.toString());
         return create(options);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index 0739014c57..bccc53b7ed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -88,7 +88,7 @@ public interface CatalogFactory extends Factory {
         // manual validation
         // because different catalog types may have different options
         // we can't list them all in the optionalOptions() method
-        String warehouse = warehouse(context).toUri().toString();
+        String warehouse = warehouse(context).toString();
 
         Path warehousePath = new Path(warehouse);
         FileIO fileIO;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index e8ee2212f3..b63a1c0b7a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -143,7 +143,7 @@ public class DataFilePathFactory {
         Optional<String> externalPathDir =
                 Optional.ofNullable(aligned.externalPath())
                         .map(Path::new)
-                        .map(p -> p.getParent().toUri().toString());
+                        .map(p -> p.getParent().toString());
         return new Path(externalPathDir.map(Path::new).orElse(parent), 
fileName);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 49d34a9ac7..a7365f2906 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -229,9 +229,7 @@ public class PojoDataFileMeta implements DataFileMeta {
 
     @Override
     public Optional<String> externalPathDir() {
-        return Optional.ofNullable(externalPath)
-                .map(Path::new)
-                .map(p -> p.getParent().toUri().toString());
+        return Optional.ofNullable(externalPath).map(Path::new).map(p -> 
p.getParent().toString());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 34ff3fb721..76c3fe7c15 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -146,7 +146,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean 
{
         Set<Path> bucketDirs =
                 deleteFiles.stream()
                         .map(Path::getParent)
-                        .filter(path -> 
path.toUri().toString().contains(BUCKET_PATH_PREFIX))
+                        .filter(path -> 
path.toString().contains(BUCKET_PATH_PREFIX))
                         .collect(Collectors.toSet());
         randomlyOnlyExecute(executor, this::tryDeleteEmptyDirectory, 
bucketDirs);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 35949062e3..b02b0ea958 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -186,8 +186,7 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
     public Identifier identifier() {
         Identifier identifier = catalogEnvironment.identifier();
         return identifier == null
-                ? SchemaManager.identifierFromPath(
-                        location().toUri().toString(), true, currentBranch())
+                ? SchemaManager.identifierFromPath(location().toString(), 
true, currentBranch())
                 : identifier;
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
index a78eddab21..a3adec13b8 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
@@ -248,7 +248,7 @@ public class CopyFilesUtil {
             Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot);
             result.add(
                     new CopyFileInfo(
-                            file.toUri().toString(),
+                            file.toString(),
                             relativePath.toString(),
                             sourceIdentifier,
                             targetIdentifier));
@@ -257,7 +257,7 @@ public class CopyFilesUtil {
     }
 
     public static Path getPathExcludeTableRoot(Path absolutePath, Path 
sourceTableRoot) {
-        String fileAbsolutePath = absolutePath.toUri().toString();
+        String fileAbsolutePath = absolutePath.toString();
         String sourceTableRootPath = sourceTableRoot.toString();
 
         Preconditions.checkState(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index bbf1c2170e..1a91d937d7 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -285,7 +285,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean 
{
                                             if (oldEnough(file)) {
                                                 out.collect(
                                                         Tuple2.of(
-                                                                
file.getPath().toUri().toString(),
+                                                                
file.getPath().toString(),
                                                                 
file.getLen()));
                                             }
                                         }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 1eca498714..cd295133b9 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -451,7 +451,7 @@ public class HiveCatalog extends AbstractCatalog {
     }
 
     private String getDataFilePath(Identifier tableIdentifier, Table hmsTable) 
{
-        String tableLocation = getTableLocation(tableIdentifier, 
hmsTable).toUri().toString();
+        String tableLocation = getTableLocation(tableIdentifier, 
hmsTable).toString();
         return 
hmsTable.getParameters().containsKey(DATA_FILE_PATH_DIRECTORY.key())
                 ? tableLocation
                         + Path.SEPARATOR
@@ -1778,7 +1778,7 @@ public class HiveCatalog extends AbstractCatalog {
                 hiveConf,
                 options.get(HiveCatalogOptions.METASTORE_CLIENT_CLASS),
                 context,
-                warehouse.toUri().toString());
+                warehouse.toString());
     }
 
     public static HiveConf createHiveConf(CatalogContext context) {
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
index 5cc826b554..003f72ce3b 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
@@ -86,8 +86,8 @@ public class PaimonMetaHook implements HiveMetaHook {
             String warehouse = 
conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
             org.apache.hadoop.fs.Path hadoopPath =
                     getDnsPath(new org.apache.hadoop.fs.Path(warehouse), conf);
-            warehouse = hadoopPath.toUri().toString();
-            location = AbstractCatalog.newTableLocation(warehouse, 
identifier).toUri().toString();
+            warehouse = hadoopPath.toString();
+            location = AbstractCatalog.newTableLocation(warehouse, 
identifier).toString();
             table.getSd().setLocation(location);
         }
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 57186709e5..0117434296 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -80,7 +80,7 @@ public class HiveSplitGenerator {
         List<PaimonInputSplit> splits = new ArrayList<>();
         // locations may contain multiple partitions
         for (String location : locations.split(",")) {
-            if (!location.startsWith(table.location().toUri().toString())) {
+            if (!location.startsWith(table.location().toString())) {
                 // Hive create dummy file for empty table or partition. If 
this location doesn't
                 // belong to this table, nothing to do.
                 continue;
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
index 092fbc238c..1a1e12efbe 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
@@ -91,8 +91,6 @@ public class CopyFilesUtil {
     }
 
     public static Optional<String> externalPathDir(String externalPath) {
-        return Optional.ofNullable(externalPath)
-                .map(Path::new)
-                .map(p -> p.getParent().toUri().toString());
+        return Optional.ofNullable(externalPath).map(Path::new).map(p -> 
p.getParent().toString());
     }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
index 74f38ca664..444b6d6c64 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
@@ -96,7 +96,7 @@ case class PaimonRecordReaderIterator(
     iter match {
       case fileRecordIterator: FileRecordIterator[_] =>
         if (lastFilePath != fileRecordIterator.filePath()) {
-          
PaimonUtils.setInputFileName(fileRecordIterator.filePath().toUri.toString)
+          PaimonUtils.setInputFileName(fileRecordIterator.filePath().toString)
           lastFilePath = fileRecordIterator.filePath()
         }
       case i =>
@@ -157,7 +157,7 @@ case class PaimonRecordReaderIterator(
           case PaimonMetadataColumn.ROW_INDEX_COLUMN =>
             metadataRow.setField(index, fileRecordIterator.returnedPosition())
           case PaimonMetadataColumn.FILE_PATH_COLUMN =>
-            metadataRow.setField(index, 
BinaryString.fromString(lastFilePath.toUri.toString))
+            metadataRow.setField(index, 
BinaryString.fromString(lastFilePath.toString))
           case PaimonMetadataColumn.PARTITION_COLUMN =>
             metadataRow.setField(index, 
split.asInstanceOf[DataSplit].partition())
           case PaimonMetadataColumn.BUCKET_COLUMN =>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
index c82da4badf..f9dc3b14c7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.commands
 
 import org.apache.paimon.CoreOptions
 import org.apache.paimon.deletionvectors.{Bitmap64DeletionVector, 
BitmapDeletionVector, DeletionVector}
+import org.apache.paimon.fs.Path
 import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
 import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
 import 
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
@@ -156,8 +157,9 @@ trait PaimonRowLevelCommand
       sparkSession: SparkSession): Dataset[SparkDeletionVector] = {
     import sparkSession.implicits._
     // convert to a serializable map
-    val dataFileToPartitionAndBucket = dataFilePathToMeta.map {
-      case (k, v) => k -> (v.bucketPath, v.partition, v.bucket)
+    val dataFileNameToPartitionAndBucket = dataFilePathToMeta.map {
+      case (k, v) =>
+        new Path(k).getName -> (v.bucketPath, v.partition, v.bucket, k)
     }
 
     val my_table = table
@@ -174,12 +176,13 @@ trait PaimonRowLevelCommand
             dv.delete(iter.next()._2)
           }
 
-          val (bucketPath, partition, bucket) = 
dataFileToPartitionAndBucket.apply(filePath)
+          val (bucketPath, partition, bucket, dataFilePath) =
+            dataFileNameToPartitionAndBucket.apply(new Path(filePath).getName)
           SparkDeletionVector(
             bucketPath,
             SerializationUtils.serializeBinaryRow(partition),
             bucket,
-            filePath,
+            dataFilePath,
             DeletionVector.serializeToBytes(dv)
           )
       }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
index 9e435d8b44..11f1364c18 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
@@ -119,7 +119,7 @@ case class SparkOrphanFilesClean(
       .toDF("used_name")
 
     // find candidate files which can be removed
-    val fileDirs = listPaimonFileDirs.asScala.map(_.toUri.toString).toSeq
+    val fileDirs = listPaimonFileDirs.asScala.map(_.toString).toSeq
     val maxFileDirsParallelism = Math.min(fileDirs.size, parallelism)
     val candidates = spark.sparkContext
       .parallelize(fileDirs, maxFileDirsParallelism)
@@ -128,7 +128,7 @@ case class SparkOrphanFilesClean(
           tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
             file =>
               val path = file.getPath
-              (path.getName, path.toUri.toString, file.getLen, 
path.getParent.toUri.toString)
+              (path.getName, path.toString, file.getLen, 
path.getParent.toString)
           }
       }
       .toDF("name", "path", "len", "dataDir")
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
index 5d0864cfe5..317182f7ee 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
@@ -47,7 +47,7 @@ class PaimonQueryTest extends PaimonSparkTestBase {
                       |VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5, 
'x5')
                       |""".stripMargin)
 
-          val location = loadTable("T").location().toUri.toString
+          val location = loadTable("T").location().toString
           val res = spark.sql(
             s"""
                |SELECT SUM(cnt)
@@ -128,7 +128,7 @@ class PaimonQueryTest extends PaimonSparkTestBase {
                            |TBLPROPERTIES ('file.format'='$fileFormat' 
$bucketProp)
                            |""".stripMargin)
 
-              val location = loadTable("T").location().toUri.toString
+              val location = loadTable("T").location().toString
 
               spark.sql("INSERT INTO T VALUES (1, 'x1'), (3, 'x3')")
 
@@ -175,7 +175,7 @@ class PaimonQueryTest extends PaimonSparkTestBase {
                            |TBLPROPERTIES ('file.format'='$fileFormat' 
$bucketProp)
                            |""".stripMargin)
 
-              val location = loadTable("T").location().toUri.toString
+              val location = loadTable("T").location().toString
 
               spark.sql("INSERT INTO T VALUES (1, 'x1', '2024'), (3, 'x3', 
'2024')")
 
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SpecialCharacterPathTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SpecialCharacterPathTest.scala
new file mode 100644
index 0000000000..d056e8baaf
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SpecialCharacterPathTest.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.fs.Path
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.utils.DateTimeUtils
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+import java.util.concurrent.TimeUnit
+
+class SpecialCharacterPathTest extends PaimonSparkTestBase {
+
+  val warehouseDir: String = tempDBDir.getCanonicalPath + "/22%3A45%3A30"
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf.set("spark.sql.catalog.paimon.warehouse", warehouseDir)
+  }
+
+  test("delete with timestamp partition column") {
+    for (useV2Write <- Seq("true", "false")) {
+      for (dvEnabled <- Seq("true", "false")) {
+        withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+          withTable("t") {
+            sql(s"""
+                   |CREATE TABLE t (id INT, dt TIMESTAMP)
+                   |PARTITIONED BY (dt)
+                   |TBLPROPERTIES (
+                   | 'deletion-vectors.enabled' = '$dvEnabled'
+                   |)
+                   |""".stripMargin)
+            val table = loadTable("t")
+            val fileIO = table.fileIO()
+
+            sql(
+              "INSERT INTO t SELECT id, CAST('2024-06-02 15:45:30' AS 
TIMESTAMP) FROM RANGE(0, 1000)")
+            checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+            checkAnswer(sql("SELECT sum(id) FROM t"), Seq(Row(499500)))
+
+            val fileDir = new Path(warehouseDir, 
"test.db/t/dt=2024-06-02T22%3A45%3A30/bucket-0")
+            assert(fileIO.exists(fileDir))
+            val filePath =
+              sql("SELECT __paimon_file_path FROM t LIMIT 
1").collect().head.getString(0)
+            assert(fileIO.exists(new Path(filePath)))
+
+            sql("DELETE FROM t WHERE id = 100")
+            checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(999)))
+            checkAnswer(sql("SELECT sum(id) FROM t"), Seq(Row(499400)))
+          }
+        }
+      }
+    }
+  }
+
+  test("remove orphan files") {
+    withTable("t") {
+      val orphanFile1Name = "bucket-0/orphan_file1"
+      val orphanFile2Name = "bucket-0/orphan_file2"
+
+      sql("CREATE TABLE t (id STRING, name STRING)")
+
+      sql(s"INSERT INTO t VALUES ('1', 'a'), ('2', 'b')")
+
+      val table = loadTable("t")
+      val fileIO = table.fileIO()
+      val tablePath = table.location()
+
+      val orphanFile1 = new Path(tablePath, orphanFile1Name)
+      val orphanFile2 = new Path(tablePath, orphanFile2Name)
+
+      fileIO.tryToWriteAtomic(orphanFile1, "a")
+      Thread.sleep(2000)
+      fileIO.tryToWriteAtomic(orphanFile2, "b")
+
+      val orphanFileModTime = 
fileIO.getFileStatus(orphanFile2).getModificationTime
+      val older_than = DateTimeUtils.formatLocalDateTime(
+        DateTimeUtils.toLocalDateTime(
+          orphanFileModTime -
+            TimeUnit.SECONDS.toMillis(1)),
+        3)
+      checkAnswer(
+        sql(s"CALL sys.remove_orphan_files(table => 't', older_than => 
'$older_than')"),
+        Row(1, 1) :: Nil)
+    }
+  }
+
+  test("data file external paths") {
+    withTempDir {
+      tmpDir =>
+        withTable("t") {
+          val externalPath = 
s"file://${tmpDir.getCanonicalPath}/22%3A45%3A3123456"
+          sql(s"""
+                 |CREATE TABLE t (id INT, v INT, dt TIMESTAMP)
+                 |TBLPROPERTIES (
+                 | 'bucket-key' = 'id',
+                 | 'bucket' = '1',
+                 | 'data-file.external-paths' = '$externalPath',
+                 | 'data-file.external-paths.strategy' = 'round-robin'
+                 |)
+                 |PARTITIONED BY (dt)
+                 |""".stripMargin)
+          sql(
+            "INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id, 
CAST('2024-06-02 15:45:30' AS TIMESTAMP) FROM range (1, 50000)")
+
+          val filePath =
+            sql("SELECT __paimon_file_path FROM t WHERE __paimon_file_path 
LIKE '%123456%' LIMIT 1")
+              .collect()
+              .head
+              .getString(0)
+          assert(fileIO.exists(new Path(filePath)))
+          assert(fileIO.exists(new Path(externalPath)))
+
+          sql("DELETE FROM t WHERE id >= 111 and id <= 444")
+          checkAnswer(sql("SELECT count(*) FROM t"), Row(49665))
+          checkAnswer(sql("SELECT sum(v) FROM t"), Row(1249882315L))
+
+          sql("UPDATE t SET v = v + 1 WHERE id >= 555 and id <= 666")
+          checkAnswer(sql("SELECT count(*) FROM t"), Row(49665))
+          checkAnswer(sql("SELECT sum(v) FROM t"), Row(1249882427L))
+
+          sql("UPDATE t SET v = v + 1 WHERE id >= 600 and id <= 800")
+          checkAnswer(sql("SELECT count(*) FROM t"), Row(49665))
+          checkAnswer(sql("SELECT sum(v) FROM t"), Row(1249882628L))
+        }
+    }
+  }
+}

Reply via email to