This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a9345e7b1 [core] test: add tests for compression (#6145)
1a9345e7b1 is described below

commit 1a9345e7b1755a3732808c2dc13725c2f54bb85d
Author: jerry <lining....@alibaba-inc.com>
AuthorDate: Tue Aug 26 15:50:55 2025 +0800

    [core] test: add tests for compression (#6145)
---
 .../paimon/format/text/HadoopCompressionUtils.java |  12 +-
 .../apache/paimon/format/TextCompressionTest.java  |  22 ++
 .../paimon/format/json/JsonCompressionTest.java    |   2 -
 .../format/text/HadoopCompressionUtilsTest.java    | 314 +++++++++++++++++++++
 4 files changed, 345 insertions(+), 5 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
index 1ff11bcc22..7ff2f97e17 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/text/HadoopCompressionUtils.java
@@ -58,10 +58,16 @@ public class HadoopCompressionUtils {
      * @param inputStream The underlying input stream
      * @param filePath The file path (used to detect compression from 
extension)
      * @return Decompressed input stream
+     * @throws IOException If decompression stream creation fails
      */
     public static InputStream createDecompressedInputStream(
-            SeekableInputStream inputStream, Path filePath) {
+            SeekableInputStream inputStream, Path filePath) throws IOException 
{
         try {
+            // Handle null filePath gracefully
+            if (filePath == null) {
+                return inputStream;
+            }
+
             CompressionCodecFactory codecFactory =
                     new CompressionCodecFactory(new Configuration(false));
 
@@ -71,7 +77,7 @@ public class HadoopCompressionUtils {
                 return codec.createInputStream(inputStream);
             }
             return inputStream;
-        } catch (Exception e) {
+        } catch (Exception | UnsatisfiedLinkError e) {
             throw new RuntimeException("Failed to create decompression 
stream", e);
         }
     }
@@ -97,7 +103,7 @@ public class HadoopCompressionUtils {
                     (CompressionCodec) 
codecClass.getDeclaredConstructor().newInstance();
             codec.createOutputStream(new java.io.ByteArrayOutputStream());
             return Optional.of(codec);
-        } catch (Exception e) {
+        } catch (Exception | UnsatisfiedLinkError e) {
             throw new RuntimeException("Failed to get compression codec", e);
         }
     }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/TextCompressionTest.java 
b/paimon-format/src/test/java/org/apache/paimon/format/TextCompressionTest.java
index e9e8fbc063..92bb45b222 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/TextCompressionTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/TextCompressionTest.java
@@ -32,6 +32,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -42,6 +43,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Base class for compression tests across different file formats. */
 public abstract class TextCompressionTest {
@@ -65,6 +67,26 @@ public abstract class TextCompressionTest {
     /** Returns the file extension for the format. */
     protected abstract String getFormatExtension();
 
+    /**
+     * Test case for when a file has a compression extension but the 
corresponding compression codec
+     * is not available or cannot be found.
+     */
+    @Test
+    void testWriteFileWithCompressionExtensionButCompressionNotFound() {
+        String fileName = "test_unsupported." + getFormatExtension() + ".xyz";
+        Options options = new Options();
+        options.set(CoreOptions.FILE_COMPRESSION, "xyz"); // Non-existent 
compression type
+
+        FileFormat format = createFileFormat(options);
+        Path filePath = new Path(tempDir.resolve(fileName).toString());
+        FileIO fileIO = new LocalFileIO();
+
+        FormatWriterFactory writerFactory = 
format.createWriterFactory(rowType);
+        assertThatThrownBy(
+                        () -> 
writerFactory.create(fileIO.newOutputStream(filePath, false), "xyz"))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
     @Disabled // TODO fix dependencies
     @ParameterizedTest(name = "compression = {0}")
     @EnumSource(HadoopCompressionType.class)
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
index 0e6deca8ff..451a721df3 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/json/JsonCompressionTest.java
@@ -59,8 +59,6 @@ class JsonCompressionTest extends TextCompressionTest {
     void testJsonCompressionWithComplexData() throws IOException {
         // Test with complex JSON structures and different compression formats
         testCompressionRoundTrip(HadoopCompressionType.GZIP.value(), 
"test_complex_gzip.json.gz");
-        testCompressionRoundTrip(
-                HadoopCompressionType.DEFLATE.value(), 
"test_complex_deflate.json.deflate");
         testCompressionRoundTrip(HadoopCompressionType.NONE.value(), 
"test_complex_none.json");
     }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/text/HadoopCompressionUtilsTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/text/HadoopCompressionUtilsTest.java
new file mode 100644
index 0000000000..0ff9ecd10e
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/text/HadoopCompressionUtilsTest.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.text;
+
+import org.apache.paimon.format.HadoopCompressionType;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link HadoopCompressionUtils}. */
+class HadoopCompressionUtilsTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private static final String TEST_DATA = "This is test data for 
compression.";
+
+    @Test
+    void testCreateCompressedOutputStreamWithNoneCompression() throws 
IOException {
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        TestPositionOutputStream positionOutputStream =
+                new TestPositionOutputStream(byteArrayOutputStream);
+
+        OutputStream result =
+                HadoopCompressionUtils.createCompressedOutputStream(
+                        positionOutputStream, 
HadoopCompressionType.NONE.value());
+
+        assertThat(result).isSameAs(positionOutputStream);
+    }
+
+    @Test
+    void testCreateCompressedOutputStreamWithInvalidCompression() {
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        TestPositionOutputStream positionOutputStream =
+                new TestPositionOutputStream(byteArrayOutputStream);
+
+        assertThatThrownBy(
+                        () ->
+                                
HadoopCompressionUtils.createCompressedOutputStream(
+                                        positionOutputStream, "invalid"))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testCreateDecompressedInputStreamWithNoCompression() throws 
IOException {
+        byte[] testData = TEST_DATA.getBytes(StandardCharsets.UTF_8);
+        TestSeekableInputStream seekableInputStream = new 
TestSeekableInputStream(testData);
+        Path filePath = new Path("test.txt");
+
+        InputStream result =
+                
HadoopCompressionUtils.createDecompressedInputStream(seekableInputStream, 
filePath);
+
+        // For uncompressed files, should return the original stream
+        assertThat(result).isSameAs(seekableInputStream);
+    }
+
+    @Test
+    void testCreateDecompressedInputStreamWithGzipFile() throws IOException {
+        byte[] testData = TEST_DATA.getBytes(StandardCharsets.UTF_8);
+        TestSeekableInputStream seekableInputStream = new 
TestSeekableInputStream(testData);
+        Path filePath = new Path("test.txt.gz");
+
+        InputStream result =
+                
HadoopCompressionUtils.createDecompressedInputStream(seekableInputStream, 
filePath);
+
+        assertThat(result).isNotNull();
+        // For compressed files, should return a different stream 
(decompression wrapper)
+        // Note: The actual decompression behavior depends on Hadoop codecs 
being available
+    }
+
+    @ParameterizedTest
+    @EnumSource(HadoopCompressionType.class)
+    void testCreateCompressedOutputStreamWithAvailableCompressions(
+            HadoopCompressionType compressionType) throws IOException {
+        if (compressionType.hadoopCodecClassName() == null) {
+            return; // Skip types without codec class names
+        }
+
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        TestPositionOutputStream positionOutputStream =
+                new TestPositionOutputStream(byteArrayOutputStream);
+
+        try {
+            OutputStream compressedStream =
+                    HadoopCompressionUtils.createCompressedOutputStream(
+                            positionOutputStream, compressionType.value());
+
+            assertThat(compressedStream).isNotNull();
+
+            // Write and close to ensure compression happens
+            compressedStream.write(TEST_DATA.getBytes(StandardCharsets.UTF_8));
+            compressedStream.close();
+
+            // Verify that some data was written
+            assertThat(byteArrayOutputStream.toByteArray()).isNotEmpty();
+        } catch (Exception e) {
+            // Skip compression type if not available
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = HadoopCompressionType.class)
+    void testCreateDecompressedInputStreamWithAvailableExtensions(
+            HadoopCompressionType compressionType) throws IOException {
+        if (compressionType.fileExtension() == null) {
+            return; // Skip types without file extensions
+        }
+
+        byte[] testData = TEST_DATA.getBytes(StandardCharsets.UTF_8);
+        TestSeekableInputStream seekableInputStream = new 
TestSeekableInputStream(testData);
+        Path filePath = new Path("test.txt." + 
compressionType.fileExtension());
+
+        try {
+            InputStream result =
+                    HadoopCompressionUtils.createDecompressedInputStream(
+                            seekableInputStream, filePath);
+            assertThat(result).isNotNull();
+        } catch (Exception e) {
+            // Skip compression type if not available
+        }
+    }
+
+    @Test
+    void testRoundTripCompressionDecompression() throws IOException {
+        // Test with actual file I/O using GZIP which is most commonly 
available
+        java.nio.file.Path testFile = tempDir.resolve("test_roundtrip.txt.gz");
+        LocalFileIO fileIO = new LocalFileIO();
+        Path paimonPath = new Path(testFile.toString());
+
+        // Write compressed data
+        try (PositionOutputStream outputStream = 
fileIO.newOutputStream(paimonPath, false);
+                OutputStream compressedStream =
+                        HadoopCompressionUtils.createCompressedOutputStream(
+                                outputStream, 
HadoopCompressionType.GZIP.value())) {
+            compressedStream.write(TEST_DATA.getBytes(StandardCharsets.UTF_8));
+        }
+
+        // Verify file was created and has content
+        assertThat(Files.exists(testFile)).isTrue();
+        assertThat(Files.size(testFile)).isGreaterThan(0);
+
+        // Read decompressed data
+        try (SeekableInputStream inputStream = 
fileIO.newInputStream(paimonPath);
+                InputStream decompressedStream =
+                        HadoopCompressionUtils.createDecompressedInputStream(
+                                inputStream, paimonPath)) {
+
+            byte[] buffer = new byte[TEST_DATA.length() * 2]; // Extra space
+            int bytesRead = decompressedStream.read(buffer);
+
+            String decompressedData = new String(buffer, 0, bytesRead, 
StandardCharsets.UTF_8);
+            assertThat(decompressedData).isEqualTo(TEST_DATA);
+        }
+    }
+
+    @Test
+    void testCreateCompressedOutputStreamWithNullCompression() {
+        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
+        TestPositionOutputStream positionOutputStream =
+                new TestPositionOutputStream(byteArrayOutputStream);
+
+        assertThatThrownBy(
+                        () ->
+                                
HadoopCompressionUtils.createCompressedOutputStream(
+                                        positionOutputStream, null))
+                .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void testCreateDecompressedInputStreamWithNullPath() {
+        byte[] testData = TEST_DATA.getBytes(StandardCharsets.UTF_8);
+        TestSeekableInputStream seekableInputStream = new 
TestSeekableInputStream(testData);
+
+        try {
+            InputStream result =
+                    
HadoopCompressionUtils.createDecompressedInputStream(seekableInputStream, null);
+            // Should handle null path gracefully and return original stream
+            assertThat(result).isNotNull();
+        } catch (IOException e) {
+            // Null path may cause IOException, which is acceptable behavior
+            assertThat(e).hasMessageContaining("Failed to create decompression 
stream");
+        }
+    }
+
+    private static class TestPositionOutputStream extends PositionOutputStream 
{
+        private final ByteArrayOutputStream delegate;
+        private long position = 0;
+
+        public TestPositionOutputStream(ByteArrayOutputStream delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return position;
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            delegate.write(b);
+            position++;
+        }
+
+        @Override
+        public void write(byte[] b) throws IOException {
+            delegate.write(b);
+            position += b.length;
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            delegate.write(b, off, len);
+            position += len;
+        }
+
+        @Override
+        public void flush() throws IOException {
+            delegate.flush();
+        }
+
+        @Override
+        public void close() throws IOException {
+            delegate.close();
+        }
+    }
+
+    private static class TestSeekableInputStream extends SeekableInputStream {
+        private final ByteArrayInputStream delegate;
+        private long position = 0;
+
+        public TestSeekableInputStream(byte[] data) {
+            this.delegate = new ByteArrayInputStream(data);
+        }
+
+        @Override
+        public void seek(long pos) throws IOException {
+            delegate.reset();
+            long skipped = delegate.skip(pos);
+            if (skipped != pos) {
+                throw new IOException("Could not seek to position " + pos);
+            }
+            position = pos;
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return position;
+        }
+
+        @Override
+        public int read() throws IOException {
+            int result = delegate.read();
+            if (result != -1) {
+                position++;
+            }
+            return result;
+        }
+
+        @Override
+        public int read(byte[] b) throws IOException {
+            int result = delegate.read(b);
+            if (result != -1) {
+                position += result;
+            }
+            return result;
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            int result = delegate.read(b, off, len);
+            if (result != -1) {
+                position += result;
+            }
+            return result;
+        }
+
+        @Override
+        public void close() throws IOException {
+            delegate.close();
+        }
+    }
+}

Reply via email to