This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d65ce3343 [test] Add compress files tests for format table (#5529)
8d65ce3343 is described below

commit 8d65ce3343688e000468147379c4592c9f06cd69
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 25 15:43:58 2025 +0800

    [test] Add compress files tests for format table (#5529)
---
 .../org/apache/paimon/utils/CompressUtils.java     | 46 ++++++++++++++++++++++
 .../hive/HiveCatalogFormatTableITCaseBase.java     | 32 +++++++++++++++
 .../paimon/spark/sql/FormatTableTestBase.scala     | 28 +++++++++++++
 3 files changed, 106 insertions(+)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/CompressUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/CompressUtils.java
new file mode 100644
index 0000000000..2a4aa71b3d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/CompressUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+
+/** Compress utils. */
+public class CompressUtils {
+
+    public static void gzipCompressFile(String src, String dest) throws 
IOException {
+        FileInputStream fis = new FileInputStream(src);
+        FileOutputStream fos = new FileOutputStream(dest);
+        GZIPOutputStream gzipOs = new GZIPOutputStream(fos);
+        byte[] buffer = new byte[1024];
+        int bytesRead;
+        while (true) {
+            bytesRead = fis.read(buffer);
+            if (bytesRead == -1) {
+                fis.close();
+                gzipOs.close();
+                return;
+            }
+
+            gzipOs.write(buffer, 0, bytesRead);
+        }
+    }
+}
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
index a817d76b1f..a9b3fb9ae0 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
@@ -18,13 +18,19 @@
 
 package org.apache.paimon.hive;
 
+import org.apache.paimon.flink.FormatCatalogTable;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.utils.CompressUtils;
 
 import com.klarna.hiverunner.HiveShell;
 import com.klarna.hiverunner.annotations.HiveSQL;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 import org.junit.Rule;
@@ -40,6 +46,7 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -215,6 +222,31 @@ public abstract class HiveCatalogFormatTableITCaseBase {
                 .containsExactlyInAnyOrder(
                         Row.of(1, LocalDateTime.parse("2025-03-17T10:15:30")),
                         Row.of(2, LocalDateTime.parse("2025-03-18T10:15:30")));
+
+        // test compression
+        FormatCatalogTable catalogTable =
+                (FormatCatalogTable)
+                        tEnv.getCatalog(tEnv.getCurrentCatalog())
+                                .get()
+                                .getTable(new 
ObjectPath(tEnv.getCurrentDatabase(), tableName));
+        FormatTable table = catalogTable.table();
+        FileIO fileIO = table.fileIO();
+        String file =
+                Arrays.stream(fileIO.listStatus(new Path(table.location())))
+                        .filter(status -> 
!status.getPath().getName().startsWith("."))
+                        .findFirst()
+                        .get()
+                        .getPath()
+                        .toUri()
+                        .getPath();
+        CompressUtils.gzipCompressFile(file, file + ".gz");
+        fileIO.deleteQuietly(new Path(file));
+
+        assertThat(collect(String.format("SELECT * FROM %s", tableName)))
+                .containsExactlyInAnyOrder(
+                        Row.of(1, LocalDateTime.parse("2025-03-17T10:15:30")),
+                        Row.of(2, LocalDateTime.parse("2025-03-18T10:15:30")));
+
         tEnv.executeSql(
                         String.format(
                                 "INSERT INTO %s VALUES (3, CAST('2025-03-19 
10:15:30' AS TIMESTAMP))",
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 6bd54a26e3..6243d6b181 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -22,9 +22,13 @@ import org.apache.paimon.catalog.Identifier
 import org.apache.paimon.fs.Path
 import org.apache.paimon.spark.PaimonHiveTestBase
 import org.apache.paimon.table.FormatTable
+import org.apache.paimon.utils.{CompressUtils, FileIOUtils, FileUtils}
 
 import org.apache.spark.sql.Row
 
+import java.io.{File, FileInputStream, FileOutputStream}
+import java.util.zip.GZIPOutputStream
+
 abstract class FormatTableTestBase extends PaimonHiveTestBase {
 
   override protected def beforeEach(): Unit = {
@@ -55,4 +59,28 @@ abstract class FormatTableTestBase extends 
PaimonHiveTestBase {
       }
     }
   }
+
+  test("Format table: read compressed files") {
+    for (format <- Seq("csv", "json")) {
+      withTable("compress_t") {
+        sql(s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format")
+        sql("INSERT INTO compress_t VALUES (1, 2, 3)")
+        val table =
+          paimonCatalog
+            .getTable(Identifier.create("default", "compress_t"))
+            .asInstanceOf[FormatTable]
+        val fileIO = table.fileIO()
+        val file = fileIO
+          .listStatus(new Path(table.location()))
+          .filter(file => !file.getPath.getName.startsWith("."))
+          .head
+          .getPath
+          .toUri
+          .getPath
+        CompressUtils.gzipCompressFile(file, file + ".gz")
+        fileIO.deleteQuietly(new Path(file))
+        checkAnswer(sql("SELECT * FROM compress_t"), Row(1, 2, 3))
+      }
+    }
+  }
 }

Reply via email to