This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f561137ef [core] Add file compression type in file names (#4420)
f561137ef is described below

commit f561137ef2b7a5b9bff497f108bc4eedef55dc15
Author: askwang <[email protected]>
AuthorDate: Mon Nov 4 13:14:02 2024 +0800

    [core] Add file compression type in file names (#4420)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 11 +++
 .../java/org/apache/paimon/AbstractFileStore.java  |  4 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  |  4 +-
 .../org/apache/paimon/io/DataFilePathFactory.java  | 16 +++-
 .../apache/paimon/utils/FileStorePathFactory.java  | 12 ++-
 .../apache/paimon/append/AppendOnlyWriterTest.java |  4 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |  4 +-
 .../apache/paimon/io/DataFilePathFactoryTest.java  |  8 +-
 .../paimon/io/KeyValueFileReadWriteTest.java       |  8 +-
 .../apache/paimon/io/RollingFileWriterTest.java    |  5 +-
 .../paimon/manifest/ManifestFileMetaTestBase.java  |  4 +-
 .../apache/paimon/manifest/ManifestFileTest.java   |  4 +-
 .../apache/paimon/manifest/ManifestListTest.java   |  4 +-
 .../paimon/utils/FileStorePathFactoryTest.java     |  8 +-
 .../flink/source/TestChangelogDataReadWrite.java   |  4 +-
 .../apache/paimon/spark/SparkFileIndexITCase.java  |  4 +-
 .../org/apache/paimon/spark/SparkWriteITCase.java  | 97 ++++++++++++++++++++++
 18 files changed, 187 insertions(+), 20 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 84ba86124..d9ac0b99b 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -332,6 +332,12 @@ under the License.
             <td>Map</td>
             <td>Define different file format for different level, you can add 
the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file 
format for level is not provided, the default format which set by `file.format` 
will be used.</td>
         </tr>
+        <tr>
+            <td><h5>file.suffix.include.compression</h5></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>Whether to add file compression type in the file name of data 
file and changelog file.</td>
+        </tr>
         <tr>
             <td><h5>force-lookup</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index f4f7f41e2..6ba8d70e0 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -190,6 +190,13 @@ public class CoreOptions implements Serializable {
                     .defaultValue("changelog-")
                     .withDescription("Specify the file name prefix of 
changelog files.");
 
+    public static final ConfigOption<Boolean> FILE_SUFFIX_INCLUDE_COMPRESSION =
+            key("file.suffix.include.compression")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether to add file compression type in the file 
name of data file and changelog file.");
+
     public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =
             key("file.block-size")
                     .memoryType()
@@ -1591,6 +1598,10 @@ public class CoreOptions implements Serializable {
         return options.get(CHANGELOG_FILE_PREFIX);
     }
 
+    public boolean fileSuffixIncludeCompression() {
+        return options.get(FILE_SUFFIX_INCLUDE_COMPRESSION);
+    }
+
     public String fieldsDefaultFunc() {
         return options.get(FIELDS_DEFAULT_AGG_FUNC);
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 811848453..55c1c72df 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -106,7 +106,9 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.fileFormat().getFormatIdentifier(),
                 options.dataFilePrefix(),
                 options.changelogFilePrefix(),
-                options.legacyPartitionName());
+                options.legacyPartitionName(),
+                options.fileSuffixIncludeCompression(),
+                options.fileCompression());
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 8f2dbbf5f..8a3bf0b0f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -207,7 +207,9 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                                         format,
                                         options.dataFilePrefix(),
                                         options.changelogFilePrefix(),
-                                        options.legacyPartitionName())));
+                                        options.legacyPartitionName(),
+                                        options.fileSuffixIncludeCompression(),
+                                        options.fileCompression())));
         return pathFactoryMap;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
index 742888b36..b632d44c9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java
@@ -39,18 +39,24 @@ public class DataFilePathFactory {
     private final String formatIdentifier;
     private final String dataFilePrefix;
     private final String changelogFilePrefix;
+    private final boolean fileSuffixIncludeCompression;
+    private final String fileCompression;
 
     public DataFilePathFactory(
             Path parent,
             String formatIdentifier,
             String dataFilePrefix,
-            String changelogFilePrefix) {
+            String changelogFilePrefix,
+            boolean fileSuffixIncludeCompression,
+            String fileCompression) {
         this.parent = parent;
         this.uuid = UUID.randomUUID().toString();
         this.pathCount = new AtomicInteger(0);
         this.formatIdentifier = formatIdentifier;
         this.dataFilePrefix = dataFilePrefix;
         this.changelogFilePrefix = changelogFilePrefix;
+        this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
+        this.fileCompression = fileCompression;
     }
 
     public Path newPath() {
@@ -62,7 +68,13 @@ public class DataFilePathFactory {
     }
 
     private Path newPath(String prefix) {
-        String name = prefix + uuid + "-" + pathCount.getAndIncrement() + "." 
+ formatIdentifier;
+        String extension;
+        if (fileSuffixIncludeCompression) {
+            extension = "." + fileCompression + "." + formatIdentifier;
+        } else {
+            extension = "." + formatIdentifier;
+        }
+        String name = prefix + uuid + "-" + pathCount.getAndIncrement() + 
extension;
         return new Path(parent, name);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index 612565c72..fcdc4634d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -43,6 +43,8 @@ public class FileStorePathFactory {
     private final String formatIdentifier;
     private final String dataFilePrefix;
     private final String changelogFilePrefix;
+    private final boolean fileSuffixIncludeCompression;
+    private final String fileCompression;
 
     private final AtomicInteger manifestFileCount;
     private final AtomicInteger manifestListCount;
@@ -57,7 +59,9 @@ public class FileStorePathFactory {
             String formatIdentifier,
             String dataFilePrefix,
             String changelogFilePrefix,
-            boolean legacyPartitionName) {
+            boolean legacyPartitionName,
+            boolean fileSuffixIncludeCompression,
+            String fileCompression) {
         this.root = root;
         this.uuid = UUID.randomUUID().toString();
 
@@ -66,6 +70,8 @@ public class FileStorePathFactory {
         this.formatIdentifier = formatIdentifier;
         this.dataFilePrefix = dataFilePrefix;
         this.changelogFilePrefix = changelogFilePrefix;
+        this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
+        this.fileCompression = fileCompression;
 
         this.manifestFileCount = new AtomicInteger(0);
         this.manifestListCount = new AtomicInteger(0);
@@ -113,7 +119,9 @@ public class FileStorePathFactory {
                 bucketPath(partition, bucket),
                 formatIdentifier,
                 dataFilePrefix,
-                changelogFilePrefix);
+                changelogFilePrefix,
+                fileSuffixIncludeCompression,
+                fileCompression);
     }
 
     public Path bucketPath(BinaryRow partition, int bucket) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 04b5fa6e6..a9012ed89 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -522,7 +522,9 @@ public class AppendOnlyWriterTest {
                 new Path(tempDir + "/dt=" + PART + "/bucket-0"),
                 CoreOptions.FILE_FORMAT.defaultValue().toString(),
                 CoreOptions.DATA_FILE_PREFIX.defaultValue(),
-                CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
+                CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+                CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                CoreOptions.FILE_COMPRESSION.defaultValue());
     }
 
     private AppendOnlyWriter createEmptyWriter(long targetFileSize) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index 439fc4999..c29519ce8 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -70,7 +70,9 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         new Path(tempDir + "/dt=1/bucket-1"),
                         format,
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
-                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
+                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         FileFormat fileFormat = FileFormat.fromIdentifier(format, new 
Options());
         LinkedList<DataFileMeta> toCompact = new LinkedList<>();
         CoreOptions options = new CoreOptions(new HashMap<>());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
index 609566258..d36966c55 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/DataFilePathFactoryTest.java
@@ -38,7 +38,9 @@ public class DataFilePathFactoryTest {
                         new Path(tempDir + "/bucket-123"),
                         CoreOptions.FILE_FORMAT.defaultValue().toString(),
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
-                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
+                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         String uuid = pathFactory.uuid();
 
         for (int i = 0; i < 20; i++) {
@@ -64,7 +66,9 @@ public class DataFilePathFactoryTest {
                         new Path(tempDir + "/dt=20211224/bucket-123"),
                         CoreOptions.FILE_FORMAT.defaultValue().toString(),
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
-                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
+                        CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         String uuid = pathFactory.uuid();
 
         for (int i = 0; i < 20; i++) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
index de593e80b..52d56afad 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java
@@ -229,7 +229,9 @@ public class KeyValueFileReadWriteTest {
                         format,
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                         CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 
1024;
         FileIO fileIO = FileIOFinder.find(path);
         Options options = new Options();
@@ -246,7 +248,9 @@ public class KeyValueFileReadWriteTest {
                         CoreOptions.FILE_FORMAT.defaultValue().toString(),
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                         CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()));
+                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue()));
 
         return KeyValueFileWriterFactory.builder(
                         fileIO,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
index 9bee36b7c..9e1de7145 100644
--- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java
@@ -81,7 +81,10 @@ public class RollingFileWriterTest {
                                                                 .toString(),
                                                         
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                                                         
CoreOptions.CHANGELOG_FILE_PREFIX
-                                                                
.defaultValue())
+                                                                
.defaultValue(),
+                                                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION
+                                                                
.defaultValue(),
+                                                        
CoreOptions.FILE_COMPRESSION.defaultValue())
                                                 .newPath(),
                                         SCHEMA,
                                         fileFormat
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
index 87a6f2e45..5e69035ca 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java
@@ -147,7 +147,9 @@ public abstract class ManifestFileMetaTestBase {
                                 CoreOptions.FILE_FORMAT.defaultValue(),
                                 CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                                 
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                                
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()),
+                                
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                                
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                                CoreOptions.FILE_COMPRESSION.defaultValue()),
                         Long.MAX_VALUE,
                         null)
                 .create();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
index 30e8e7b20..34cca41e6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileTest.java
@@ -103,7 +103,9 @@ public class ManifestFileTest {
                         CoreOptions.FILE_FORMAT.defaultValue().toString(),
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                         CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 
1024;
         FileIO fileIO = FileIOFinder.find(path);
         return new ManifestFile.Factory(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
index f4703c1ff..ce4f7b807 100644
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestListTest.java
@@ -107,7 +107,9 @@ public class ManifestListTest {
                         CoreOptions.FILE_FORMAT.defaultValue().toString(),
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                         CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", 
pathFactory, null)
                 .create();
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
index 26bbd28e4..d4d45b312 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/utils/FileStorePathFactoryTest.java
@@ -88,7 +88,9 @@ public class FileStorePathFactoryTest {
                         CoreOptions.FILE_FORMAT.defaultValue().toString(),
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                         CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
 
         assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
         assertPartition("20211224", null, pathFactory, 
"/dt=20211224/hr=default");
@@ -126,6 +128,8 @@ public class FileStorePathFactoryTest {
                 CoreOptions.FILE_FORMAT.defaultValue().toString(),
                 CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                 CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+                CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                CoreOptions.FILE_COMPRESSION.defaultValue());
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
index 762a14ea1..85679e5fd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/TestChangelogDataReadWrite.java
@@ -108,7 +108,9 @@ public class TestChangelogDataReadWrite {
                         CoreOptions.FILE_FORMAT.defaultValue().toString(),
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                         CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
         this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new 
Path(root));
         this.commitUser = UUID.randomUUID().toString();
     }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
index d82f92af5..2251619c4 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkFileIndexITCase.java
@@ -131,7 +131,9 @@ public class SparkFileIndexITCase extends SparkWriteITCase {
                         CoreOptions.FILE_FORMAT.defaultValue().toString(),
                         CoreOptions.DATA_FILE_PREFIX.defaultValue(),
                         CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
-                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
+                        
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
+                        
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
+                        CoreOptions.FILE_COMPRESSION.defaultValue());
 
         Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
         ReadBuilder readBuilder = table.newReadBuilder();
diff --git 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index 0ff104eeb..b0d5b380c 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++ 
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -296,6 +296,9 @@ public class SparkWriteITCase {
         for (String fileName : fileNames) {
             Assertions.assertTrue(fileName.startsWith("test-"));
         }
+
+        // reset config, it will affect other tests
+        spark.conf().unset("spark.paimon.data-file.prefix");
     }
 
     @Test
@@ -317,6 +320,9 @@ public class SparkWriteITCase {
         spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
         FileStatus[] files2 = fileIO.listStatus(new Path(tabLocation, 
"bucket-0"));
         Assertions.assertEquals(1, dataFileCount(files2, "test-changelog-"));
+
+        // reset config, it will affect other tests
+        spark.conf().unset("spark.paimon.changelog-file.prefix");
     }
 
     @Test
@@ -333,6 +339,97 @@ public class SparkWriteITCase {
         Assertions.assertTrue(fileIO.exists(new Path(tabLocation, 
"c=aa/_SUCCESS")));
     }
 
+    @Test
+    public void testDataFileSuffixName() {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT, c STRING)"
+                        + " TBLPROPERTIES ("
+                        + "'bucket' = '1', "
+                        + "'primary-key'='a', "
+                        + "'write-only' = 'true', "
+                        + "'file.format' = 'parquet', "
+                        + "'file.compression' = 'zstd')");
+
+        spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
+        spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
+
+        // enable file suffix
+        spark.conf().set("spark.paimon.file.suffix.include.compression", true);
+        spark.sql("INSERT INTO T VALUES (3, 3, 'cc')");
+        spark.sql("INSERT INTO T VALUES (4, 4, 'dd')");
+
+        List<Row> data2 = spark.sql("SELECT * FROM T order by 
a").collectAsList();
+        assertThat(data2.toString()).isEqualTo("[[1,1,aa], [2,2,bb], [3,3,cc], 
[4,4,dd]]");
+
+        // check files suffix name
+        List<String> files =
+                spark.sql("select file_path from 
`T$files`").collectAsList().stream()
+                        .map(x -> x.getString(0))
+                        .collect(Collectors.toList());
+        Assertions.assertEquals(4, files.size());
+
+        String defaultExtension = "." + "parquet";
+        String newExtension = "." + "zstd" + "." + "parquet";
+        // two data files end with ".parquet", two data file end with 
".zstd.parquet"
+        Assertions.assertEquals(
+                2,
+                files.stream()
+                        .filter(
+                                name ->
+                                        name.endsWith(defaultExtension)
+                                                && 
!name.endsWith(newExtension))
+                        .count());
+        Assertions.assertEquals(
+                2, files.stream().filter(name -> 
name.endsWith(newExtension)).count());
+
+        // reset config
+        spark.conf().unset("spark.paimon.file.suffix.include.compression");
+    }
+
+    @Test
+    public void testChangelogFileSuffixName() throws Exception {
+        spark.sql(
+                "CREATE TABLE T (a INT, b INT, c STRING) "
+                        + "TBLPROPERTIES ("
+                        + "'primary-key'='a', "
+                        + "'bucket' = '1', "
+                        + "'changelog-producer' = 'lookup', "
+                        + "'file.format' = 'parquet', "
+                        + "'file.compression' = 'zstd')");
+
+        FileStoreTable table = getTable("T");
+        Path tabLocation = table.location();
+        FileIO fileIO = table.fileIO();
+
+        spark.sql("INSERT INTO T VALUES (1, 1, 'aa')");
+
+        spark.conf().set("spark.paimon.file.suffix.include.compression", true);
+        spark.sql("INSERT INTO T VALUES (2, 2, 'bb')");
+
+        // collect changelog files
+        List<String> files =
+                Arrays.stream(fileIO.listStatus(new Path(tabLocation, 
"bucket-0")))
+                        .map(name -> name.getPath().getName())
+                        .filter(name -> name.startsWith("changelog-"))
+                        .collect(Collectors.toList());
+        String defaultExtension = "." + "parquet";
+        String newExtension = "." + "zstd" + "." + "parquet";
+        // one changelog file end with ".parquet", one changelog file end with 
".zstd.parquet"
+        Assertions.assertEquals(
+                1,
+                files.stream()
+                        .filter(
+                                name ->
+                                        name.endsWith(defaultExtension)
+                                                && 
!name.endsWith(newExtension))
+                        .count());
+        Assertions.assertEquals(
+                1, files.stream().filter(name -> 
name.endsWith(newExtension)).count());
+
+        // reset config
+        spark.conf().unset("spark.paimon.file.suffix.include.compression");
+    }
+
     protected static FileStoreTable getTable(String tableName) {
         return FileStoreTableFactory.create(
                 LocalFileIO.create(),

Reply via email to