This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 8d65ce3343 [test] Add compress files tests for format table (#5529)
8d65ce3343 is described below
commit 8d65ce3343688e000468147379c4592c9f06cd69
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Apr 25 15:43:58 2025 +0800
[test] Add compress files tests for format table (#5529)
---
.../org/apache/paimon/utils/CompressUtils.java | 46 ++++++++++++++++++++++
.../hive/HiveCatalogFormatTableITCaseBase.java | 32 +++++++++++++++
.../paimon/spark/sql/FormatTableTestBase.scala | 28 +++++++++++++
3 files changed, 106 insertions(+)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/CompressUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/CompressUtils.java
new file mode 100644
index 0000000000..2a4aa71b3d
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/CompressUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+
+/** Compress utils. */
+public class CompressUtils {
+
+ public static void gzipCompressFile(String src, String dest) throws
IOException {
+ FileInputStream fis = new FileInputStream(src);
+ FileOutputStream fos = new FileOutputStream(dest);
+ GZIPOutputStream gzipOs = new GZIPOutputStream(fos);
+ byte[] buffer = new byte[1024];
+ int bytesRead;
+ while (true) {
+ bytesRead = fis.read(buffer);
+ if (bytesRead == -1) {
+ fis.close();
+ gzipOs.close();
+ return;
+ }
+
+ gzipOs.write(buffer, 0, bytesRead);
+ }
+ }
+}
diff --git
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
index a817d76b1f..a9b3fb9ae0 100644
---
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
+++
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogFormatTableITCaseBase.java
@@ -18,13 +18,19 @@
package org.apache.paimon.hive;
+import org.apache.paimon.flink.FormatCatalogTable;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.utils.CompressUtils;
import com.klarna.hiverunner.HiveShell;
import com.klarna.hiverunner.annotations.HiveSQL;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Rule;
@@ -40,6 +46,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.time.LocalDateTime;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -215,6 +222,31 @@ public abstract class HiveCatalogFormatTableITCaseBase {
.containsExactlyInAnyOrder(
Row.of(1, LocalDateTime.parse("2025-03-17T10:15:30")),
Row.of(2, LocalDateTime.parse("2025-03-18T10:15:30")));
+
+ // test compression
+ FormatCatalogTable catalogTable =
+ (FormatCatalogTable)
+ tEnv.getCatalog(tEnv.getCurrentCatalog())
+ .get()
+ .getTable(new
ObjectPath(tEnv.getCurrentDatabase(), tableName));
+ FormatTable table = catalogTable.table();
+ FileIO fileIO = table.fileIO();
+ String file =
+ Arrays.stream(fileIO.listStatus(new Path(table.location())))
+ .filter(status ->
!status.getPath().getName().startsWith("."))
+ .findFirst()
+ .get()
+ .getPath()
+ .toUri()
+ .getPath();
+ CompressUtils.gzipCompressFile(file, file + ".gz");
+ fileIO.deleteQuietly(new Path(file));
+
+ assertThat(collect(String.format("SELECT * FROM %s", tableName)))
+ .containsExactlyInAnyOrder(
+ Row.of(1, LocalDateTime.parse("2025-03-17T10:15:30")),
+ Row.of(2, LocalDateTime.parse("2025-03-18T10:15:30")));
+
tEnv.executeSql(
String.format(
"INSERT INTO %s VALUES (3, CAST('2025-03-19
10:15:30' AS TIMESTAMP))",
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
index 6bd54a26e3..6243d6b181 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -22,9 +22,13 @@ import org.apache.paimon.catalog.Identifier
import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonHiveTestBase
import org.apache.paimon.table.FormatTable
+import org.apache.paimon.utils.{CompressUtils, FileIOUtils, FileUtils}
import org.apache.spark.sql.Row
+import java.io.{File, FileInputStream, FileOutputStream}
+import java.util.zip.GZIPOutputStream
+
abstract class FormatTableTestBase extends PaimonHiveTestBase {
override protected def beforeEach(): Unit = {
@@ -55,4 +59,28 @@ abstract class FormatTableTestBase extends
PaimonHiveTestBase {
}
}
}
+
+ test("Format table: read compressed files") {
+ for (format <- Seq("csv", "json")) {
+ withTable("compress_t") {
+ sql(s"CREATE TABLE compress_t (a INT, b INT, c INT) USING $format")
+ sql("INSERT INTO compress_t VALUES (1, 2, 3)")
+ val table =
+ paimonCatalog
+ .getTable(Identifier.create("default", "compress_t"))
+ .asInstanceOf[FormatTable]
+ val fileIO = table.fileIO()
+ val file = fileIO
+ .listStatus(new Path(table.location()))
+ .filter(file => !file.getPath.getName.startsWith("."))
+ .head
+ .getPath
+ .toUri
+ .getPath
+ CompressUtils.gzipCompressFile(file, file + ".gz")
+ fileIO.deleteQuietly(new Path(file))
+ checkAnswer(sql("SELECT * FROM compress_t"), Row(1, 2, 3))
+ }
+ }
+ }
}