This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b249c84272 [core] Do not use path.toUri().toString() when a location
string is expected internally (#7098)
b249c84272 is described below
commit b249c842723e5131484cd319d7a142629638e19d
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Jan 22 11:19:12 2026 +0800
[core] Do not use path.toUri().toString() when a location string is
expected internally (#7098)
---
.../test/java/org/apache/paimon/fs/PathTest.java | 46 +++++++
.../org/apache/paimon/catalog/CatalogContext.java | 2 +-
.../org/apache/paimon/catalog/CatalogFactory.java | 2 +-
.../org/apache/paimon/io/DataFilePathFactory.java | 2 +-
.../org/apache/paimon/io/PojoDataFileMeta.java | 4 +-
.../paimon/operation/LocalOrphanFilesClean.java | 2 +-
.../paimon/table/AbstractFileStoreTable.java | 3 +-
.../apache/paimon/flink/copy/CopyFilesUtil.java | 4 +-
.../paimon/flink/orphan/FlinkOrphanFilesClean.java | 2 +-
.../java/org/apache/paimon/hive/HiveCatalog.java | 4 +-
.../org/apache/paimon/hive/PaimonMetaHook.java | 4 +-
.../paimon/hive/utils/HiveSplitGenerator.java | 2 +-
.../apache/paimon/spark/copy/CopyFilesUtil.java | 4 +-
.../paimon/spark/PaimonRecordReaderIterator.scala | 4 +-
.../spark/commands/PaimonRowLevelCommand.scala | 11 +-
.../spark/procedure/SparkOrphanFilesClean.scala | 4 +-
.../apache/paimon/spark/sql/PaimonQueryTest.scala | 6 +-
.../spark/sql/SpecialCharacterPathTest.scala | 145 +++++++++++++++++++++
18 files changed, 220 insertions(+), 31 deletions(-)
diff --git a/paimon-api/src/test/java/org/apache/paimon/fs/PathTest.java
b/paimon-api/src/test/java/org/apache/paimon/fs/PathTest.java
new file mode 100644
index 0000000000..ae3e84b1f2
--- /dev/null
+++ b/paimon-api/src/test/java/org/apache/paimon/fs/PathTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.fs;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link Path}. */
+public class PathTest {
+
+ @Test
+ public void testPathWithSpecialCharacters() {
+ String location = "file:/tmp/path/test file";
+ String encodedLocation = "file:/tmp/path/test%20file";
+ Path p1 = new Path(location);
+ assertThat(p1.toString()).isEqualTo(location);
+ assertThat(p1.toUri().toString()).isEqualTo(encodedLocation);
+
+ // Using p1.toString() to create a new Path is correct
+ Path p2 = new Path(p1.toString());
+ assertThat(p2.toString()).isEqualTo(location);
+ assertThat(p2.toUri().toString()).isEqualTo(encodedLocation);
+
+ // Using p1.toUri().toString to create a new Path will encode again
+ Path p3 = new Path(p1.toUri().toString());
+ assertThat(p3.toString()).isEqualTo(encodedLocation);
+
assertThat(p3.toUri().toString()).isEqualTo("file:/tmp/path/test%2520file");
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java
b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java
index 9e9cd2fd85..9eb8a55389 100644
--- a/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java
+++ b/paimon-common/src/main/java/org/apache/paimon/catalog/CatalogContext.java
@@ -64,7 +64,7 @@ public class CatalogContext implements Serializable {
public static CatalogContext create(Path warehouse) {
Options options = new Options();
- options.set(WAREHOUSE, warehouse.toUri().toString());
+ options.set(WAREHOUSE, warehouse.toString());
return create(options);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
index 0739014c57..bccc53b7ed 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogFactory.java
@@ -88,7 +88,7 @@ public interface CatalogFactory extends Factory {
// manual validation
// because different catalog types may have different options
// we can't list them all in the optionalOptions() method
- String warehouse = warehouse(context).toUri().toString();
+ String warehouse = warehouse(context).toString();
Path warehousePath = new Path(warehouse);
FileIO fileIO;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index e8ee2212f3..b63a1c0b7a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -143,7 +143,7 @@ public class DataFilePathFactory {
Optional<String> externalPathDir =
Optional.ofNullable(aligned.externalPath())
.map(Path::new)
- .map(p -> p.getParent().toUri().toString());
+ .map(p -> p.getParent().toString());
return new Path(externalPathDir.map(Path::new).orElse(parent),
fileName);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
index 49d34a9ac7..a7365f2906 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/PojoDataFileMeta.java
@@ -229,9 +229,7 @@ public class PojoDataFileMeta implements DataFileMeta {
@Override
public Optional<String> externalPathDir() {
- return Optional.ofNullable(externalPath)
- .map(Path::new)
- .map(p -> p.getParent().toUri().toString());
+ return Optional.ofNullable(externalPath).map(Path::new).map(p ->
p.getParent().toString());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
index 34ff3fb721..76c3fe7c15 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java
@@ -146,7 +146,7 @@ public class LocalOrphanFilesClean extends OrphanFilesClean
{
Set<Path> bucketDirs =
deleteFiles.stream()
.map(Path::getParent)
- .filter(path ->
path.toUri().toString().contains(BUCKET_PATH_PREFIX))
+ .filter(path ->
path.toString().contains(BUCKET_PATH_PREFIX))
.collect(Collectors.toSet());
randomlyOnlyExecute(executor, this::tryDeleteEmptyDirectory,
bucketDirs);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 35949062e3..b02b0ea958 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -186,8 +186,7 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
public Identifier identifier() {
Identifier identifier = catalogEnvironment.identifier();
return identifier == null
- ? SchemaManager.identifierFromPath(
- location().toUri().toString(), true, currentBranch())
+ ? SchemaManager.identifierFromPath(location().toString(),
true, currentBranch())
: identifier;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
index a78eddab21..a3adec13b8 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/copy/CopyFilesUtil.java
@@ -248,7 +248,7 @@ public class CopyFilesUtil {
Path relativePath = getPathExcludeTableRoot(file, sourceTableRoot);
result.add(
new CopyFileInfo(
- file.toUri().toString(),
+ file.toString(),
relativePath.toString(),
sourceIdentifier,
targetIdentifier));
@@ -257,7 +257,7 @@ public class CopyFilesUtil {
}
public static Path getPathExcludeTableRoot(Path absolutePath, Path
sourceTableRoot) {
- String fileAbsolutePath = absolutePath.toUri().toString();
+ String fileAbsolutePath = absolutePath.toString();
String sourceTableRootPath = sourceTableRoot.toString();
Preconditions.checkState(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
index bbf1c2170e..1a91d937d7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java
@@ -285,7 +285,7 @@ public class FlinkOrphanFilesClean extends OrphanFilesClean
{
if (oldEnough(file)) {
out.collect(
Tuple2.of(
-
file.getPath().toUri().toString(),
+
file.getPath().toString(),
file.getLen()));
}
}
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 1eca498714..cd295133b9 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -451,7 +451,7 @@ public class HiveCatalog extends AbstractCatalog {
}
private String getDataFilePath(Identifier tableIdentifier, Table hmsTable)
{
- String tableLocation = getTableLocation(tableIdentifier,
hmsTable).toUri().toString();
+ String tableLocation = getTableLocation(tableIdentifier,
hmsTable).toString();
return
hmsTable.getParameters().containsKey(DATA_FILE_PATH_DIRECTORY.key())
? tableLocation
+ Path.SEPARATOR
@@ -1778,7 +1778,7 @@ public class HiveCatalog extends AbstractCatalog {
hiveConf,
options.get(HiveCatalogOptions.METASTORE_CLIENT_CLASS),
context,
- warehouse.toUri().toString());
+ warehouse.toString());
}
public static HiveConf createHiveConf(CatalogContext context) {
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
index 5cc826b554..003f72ce3b 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
@@ -86,8 +86,8 @@ public class PaimonMetaHook implements HiveMetaHook {
String warehouse =
conf.get(HiveConf.ConfVars.METASTOREWAREHOUSE.varname);
org.apache.hadoop.fs.Path hadoopPath =
getDnsPath(new org.apache.hadoop.fs.Path(warehouse), conf);
- warehouse = hadoopPath.toUri().toString();
- location = AbstractCatalog.newTableLocation(warehouse,
identifier).toUri().toString();
+ warehouse = hadoopPath.toString();
+ location = AbstractCatalog.newTableLocation(warehouse,
identifier).toString();
table.getSd().setLocation(location);
}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
index 57186709e5..0117434296 100644
---
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
+++
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/utils/HiveSplitGenerator.java
@@ -80,7 +80,7 @@ public class HiveSplitGenerator {
List<PaimonInputSplit> splits = new ArrayList<>();
// locations may contain multiple partitions
for (String location : locations.split(",")) {
- if (!location.startsWith(table.location().toUri().toString())) {
+ if (!location.startsWith(table.location().toString())) {
// Hive create dummy file for empty table or partition. If
this location doesn't
// belong to this table, nothing to do.
continue;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
index 092fbc238c..1a1e12efbe 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/copy/CopyFilesUtil.java
@@ -91,8 +91,6 @@ public class CopyFilesUtil {
}
public static Optional<String> externalPathDir(String externalPath) {
- return Optional.ofNullable(externalPath)
- .map(Path::new)
- .map(p -> p.getParent().toUri().toString());
+ return Optional.ofNullable(externalPath).map(Path::new).map(p ->
p.getParent().toString());
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
index 74f38ca664..444b6d6c64 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonRecordReaderIterator.scala
@@ -96,7 +96,7 @@ case class PaimonRecordReaderIterator(
iter match {
case fileRecordIterator: FileRecordIterator[_] =>
if (lastFilePath != fileRecordIterator.filePath()) {
-
PaimonUtils.setInputFileName(fileRecordIterator.filePath().toUri.toString)
+ PaimonUtils.setInputFileName(fileRecordIterator.filePath().toString)
lastFilePath = fileRecordIterator.filePath()
}
case i =>
@@ -157,7 +157,7 @@ case class PaimonRecordReaderIterator(
case PaimonMetadataColumn.ROW_INDEX_COLUMN =>
metadataRow.setField(index, fileRecordIterator.returnedPosition())
case PaimonMetadataColumn.FILE_PATH_COLUMN =>
- metadataRow.setField(index,
BinaryString.fromString(lastFilePath.toUri.toString))
+ metadataRow.setField(index,
BinaryString.fromString(lastFilePath.toString))
case PaimonMetadataColumn.PARTITION_COLUMN =>
metadataRow.setField(index,
split.asInstanceOf[DataSplit].partition())
case PaimonMetadataColumn.BUCKET_COLUMN =>
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
index c82da4badf..f9dc3b14c7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala
@@ -20,6 +20,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions
import org.apache.paimon.deletionvectors.{Bitmap64DeletionVector,
BitmapDeletionVector, DeletionVector}
+import org.apache.paimon.fs.Path
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
@@ -156,8 +157,9 @@ trait PaimonRowLevelCommand
sparkSession: SparkSession): Dataset[SparkDeletionVector] = {
import sparkSession.implicits._
// convert to a serializable map
- val dataFileToPartitionAndBucket = dataFilePathToMeta.map {
- case (k, v) => k -> (v.bucketPath, v.partition, v.bucket)
+ val dataFileNameToPartitionAndBucket = dataFilePathToMeta.map {
+ case (k, v) =>
+ new Path(k).getName -> (v.bucketPath, v.partition, v.bucket, k)
}
val my_table = table
@@ -174,12 +176,13 @@ trait PaimonRowLevelCommand
dv.delete(iter.next()._2)
}
- val (bucketPath, partition, bucket) =
dataFileToPartitionAndBucket.apply(filePath)
+ val (bucketPath, partition, bucket, dataFilePath) =
+ dataFileNameToPartitionAndBucket.apply(new Path(filePath).getName)
SparkDeletionVector(
bucketPath,
SerializationUtils.serializeBinaryRow(partition),
bucket,
- filePath,
+ dataFilePath,
DeletionVector.serializeToBytes(dv)
)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
index 9e435d8b44..11f1364c18 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
@@ -119,7 +119,7 @@ case class SparkOrphanFilesClean(
.toDF("used_name")
// find candidate files which can be removed
- val fileDirs = listPaimonFileDirs.asScala.map(_.toUri.toString).toSeq
+ val fileDirs = listPaimonFileDirs.asScala.map(_.toString).toSeq
val maxFileDirsParallelism = Math.min(fileDirs.size, parallelism)
val candidates = spark.sparkContext
.parallelize(fileDirs, maxFileDirsParallelism)
@@ -128,7 +128,7 @@ case class SparkOrphanFilesClean(
tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
file =>
val path = file.getPath
- (path.getName, path.toUri.toString, file.getLen,
path.getParent.toUri.toString)
+ (path.getName, path.toString, file.getLen,
path.getParent.toString)
}
}
.toDF("name", "path", "len", "dataDir")
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
index 5d0864cfe5..317182f7ee 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonQueryTest.scala
@@ -47,7 +47,7 @@ class PaimonQueryTest extends PaimonSparkTestBase {
|VALUES (1, 'x1'), (2, 'x3'), (3, 'x3'), (4, 'x4'), (5,
'x5')
|""".stripMargin)
- val location = loadTable("T").location().toUri.toString
+ val location = loadTable("T").location().toString
val res = spark.sql(
s"""
|SELECT SUM(cnt)
@@ -128,7 +128,7 @@ class PaimonQueryTest extends PaimonSparkTestBase {
|TBLPROPERTIES ('file.format'='$fileFormat'
$bucketProp)
|""".stripMargin)
- val location = loadTable("T").location().toUri.toString
+ val location = loadTable("T").location().toString
spark.sql("INSERT INTO T VALUES (1, 'x1'), (3, 'x3')")
@@ -175,7 +175,7 @@ class PaimonQueryTest extends PaimonSparkTestBase {
|TBLPROPERTIES ('file.format'='$fileFormat'
$bucketProp)
|""".stripMargin)
- val location = loadTable("T").location().toUri.toString
+ val location = loadTable("T").location().toString
spark.sql("INSERT INTO T VALUES (1, 'x1', '2024'), (3, 'x3',
'2024')")
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SpecialCharacterPathTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SpecialCharacterPathTest.scala
new file mode 100644
index 0000000000..d056e8baaf
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/SpecialCharacterPathTest.scala
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.fs.Path
+import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.utils.DateTimeUtils
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+import java.util.concurrent.TimeUnit
+
+class SpecialCharacterPathTest extends PaimonSparkTestBase {
+
+ val warehouseDir: String = tempDBDir.getCanonicalPath + "/22%3A45%3A30"
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf.set("spark.sql.catalog.paimon.warehouse", warehouseDir)
+ }
+
+ test("delete with timestamp partition column") {
+ for (useV2Write <- Seq("true", "false")) {
+ for (dvEnabled <- Seq("true", "false")) {
+ withSparkSQLConf("spark.paimon.write.use-v2-write" -> useV2Write) {
+ withTable("t") {
+ sql(s"""
+ |CREATE TABLE t (id INT, dt TIMESTAMP)
+ |PARTITIONED BY (dt)
+ |TBLPROPERTIES (
+ | 'deletion-vectors.enabled' = '$dvEnabled'
+ |)
+ |""".stripMargin)
+ val table = loadTable("t")
+ val fileIO = table.fileIO()
+
+ sql(
+ "INSERT INTO t SELECT id, CAST('2024-06-02 15:45:30' AS
TIMESTAMP) FROM RANGE(0, 1000)")
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1000)))
+ checkAnswer(sql("SELECT sum(id) FROM t"), Seq(Row(499500)))
+
+ val fileDir = new Path(warehouseDir,
"test.db/t/dt=2024-06-02T22%3A45%3A30/bucket-0")
+ assert(fileIO.exists(fileDir))
+ val filePath =
+ sql("SELECT __paimon_file_path FROM t LIMIT
1").collect().head.getString(0)
+ assert(fileIO.exists(new Path(filePath)))
+
+ sql("DELETE FROM t WHERE id = 100")
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(999)))
+ checkAnswer(sql("SELECT sum(id) FROM t"), Seq(Row(499400)))
+ }
+ }
+ }
+ }
+ }
+
+ test("remove orphan files") {
+ withTable("t") {
+ val orphanFile1Name = "bucket-0/orphan_file1"
+ val orphanFile2Name = "bucket-0/orphan_file2"
+
+ sql("CREATE TABLE t (id STRING, name STRING)")
+
+ sql(s"INSERT INTO t VALUES ('1', 'a'), ('2', 'b')")
+
+ val table = loadTable("t")
+ val fileIO = table.fileIO()
+ val tablePath = table.location()
+
+ val orphanFile1 = new Path(tablePath, orphanFile1Name)
+ val orphanFile2 = new Path(tablePath, orphanFile2Name)
+
+ fileIO.tryToWriteAtomic(orphanFile1, "a")
+ Thread.sleep(2000)
+ fileIO.tryToWriteAtomic(orphanFile2, "b")
+
+ val orphanFileModTime =
fileIO.getFileStatus(orphanFile2).getModificationTime
+ val older_than = DateTimeUtils.formatLocalDateTime(
+ DateTimeUtils.toLocalDateTime(
+ orphanFileModTime -
+ TimeUnit.SECONDS.toMillis(1)),
+ 3)
+ checkAnswer(
+ sql(s"CALL sys.remove_orphan_files(table => 't', older_than =>
'$older_than')"),
+ Row(1, 1) :: Nil)
+ }
+ }
+
+ test("data file external paths") {
+ withTempDir {
+ tmpDir =>
+ withTable("t") {
+ val externalPath =
s"file://${tmpDir.getCanonicalPath}/22%3A45%3A3123456"
+ sql(s"""
+ |CREATE TABLE t (id INT, v INT, dt TIMESTAMP)
+ |TBLPROPERTIES (
+ | 'bucket-key' = 'id',
+ | 'bucket' = '1',
+ | 'data-file.external-paths' = '$externalPath',
+ | 'data-file.external-paths.strategy' = 'round-robin'
+ |)
+ |PARTITIONED BY (dt)
+ |""".stripMargin)
+ sql(
+ "INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id,
CAST('2024-06-02 15:45:30' AS TIMESTAMP) FROM range (1, 50000)")
+
+ val filePath =
+ sql("SELECT __paimon_file_path FROM t WHERE __paimon_file_path
LIKE '%123456%' LIMIT 1")
+ .collect()
+ .head
+ .getString(0)
+ assert(fileIO.exists(new Path(filePath)))
+ assert(fileIO.exists(new Path(externalPath)))
+
+ sql("DELETE FROM t WHERE id >= 111 and id <= 444")
+ checkAnswer(sql("SELECT count(*) FROM t"), Row(49665))
+ checkAnswer(sql("SELECT sum(v) FROM t"), Row(1249882315L))
+
+ sql("UPDATE t SET v = v + 1 WHERE id >= 555 and id <= 666")
+ checkAnswer(sql("SELECT count(*) FROM t"), Row(49665))
+ checkAnswer(sql("SELECT sum(v) FROM t"), Row(1249882427L))
+
+ sql("UPDATE t SET v = v + 1 WHERE id >= 600 and id <= 800")
+ checkAnswer(sql("SELECT count(*) FROM t"), Row(49665))
+ checkAnswer(sql("SELECT sum(v) FROM t"), Row(1249882628L))
+ }
+ }
+ }
+}