Copilot commented on code in PR #6103:
URL: https://github.com/apache/paimon/pull/6103#discussion_r2292768107


##########
paimon-format/src/main/java/org/apache/paimon/format/BaseTextFileWriter.java:
##########
@@ -20,25 +20,39 @@
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.types.RowType;
 
 import java.io.BufferedWriter;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 
 /** Base class for text-based format writers that provides common 
functionality. */
 public abstract class BaseTextFileWriter implements FormatWriter {
 
     protected final PositionOutputStream outputStream;
+    private final OutputStream compressedStream;

Review Comment:
   The 'compressedStream' field is stored but never explicitly closed in the 
close() method. This could lead to resource leaks if the compressed stream has 
resources that need cleanup beyond the writer.



##########
paimon-format/src/main/java/org/apache/paimon/format/TextCompression.java:
##########
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.HadoopUtils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Optional;
+
+/** Utility class for handling text file compression and decompression using 
Hadoop codecs. */
+public class TextCompression {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(TextCompression.class);
+    /**
+     * Creates a compressed output stream using Hadoop's compression codecs.
+     *
+     * @param out The underlying output stream
+     * @param compression The compression format
+     * @param options Paimon options for Hadoop configuration
+     * @return Compressed output stream
+     * @throws IOException If compression stream creation fails
+     */
+    public static OutputStream createCompressedOutputStream(
+            PositionOutputStream out, CompressionType compression, Options 
options)
+            throws IOException {
+        Optional<CompressionCodec> codecOpt =
+                getCompressionCodecByCompression(compression, options);
+        if (codecOpt.isPresent()) {
+            return codecOpt.get().createOutputStream(out);
+        }
+        return out;
+    }
+
+    /**
+     * Creates a decompressed input stream using Hadoop's compression codecs.
+     *
+     * @param inputStream The underlying input stream
+     * @param filePath The file path (used to detect compression from 
extension)
+     * @param options Paimon options for Hadoop configuration
+     * @return Decompressed input stream
+     */
+    public static InputStream createDecompressedInputStream(
+            SeekableInputStream inputStream, Path filePath, Options options) {
+        try {
+            Configuration conf = HadoopUtils.getHadoopConfiguration(options);
+            CompressionCodecFactory codecFactory = new 
CompressionCodecFactory(conf);
+
+            Optional<CompressionCodec> codecOpt =
+                    Optional.ofNullable(
+                            codecFactory.getCodec(
+                                    new 
org.apache.hadoop.fs.Path(filePath.toString())));
+
+            if (!codecOpt.isPresent()) {
+                CompressionType compressionType =
+                        TextCompression.getTextCompressionType(
+                                options.get(CoreOptions.FILE_COMPRESSION), 
options);
+                codecOpt = getCompressionCodecByCompression(compressionType, 
options);
+            }
+            if (codecOpt.isPresent()) {
+                return codecOpt.get().createInputStream(inputStream);
+            }
+        } catch (Throwable e) {
+            LOG.warn("Failed to create decompressed for input stream, so use 
none", e);

Review Comment:
   The log message has a grammatical error. It should read "Failed to create 
decompressed input stream, so use none" (missing 'input stream').
   ```suggestion
               LOG.warn("Failed to create decompressed input stream, so use 
none", e);
   ```



##########
paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java:
##########
@@ -682,7 +682,7 @@ public void testChangelogFileSuffixName() throws Exception {
                         .filter(name -> name.contains("changelog-"))
                         .collect(Collectors.toList());
         String defaultExtension = "." + "parquet";
-        String newExtension = "." + "zstd" + "." + "parquet";
+        String newExtension = "." + "zst" + "." + "parquet";
         // one changelog file end with ".parquet", one changelog file end with 
".zstd.parquet"

Review Comment:
   The comment on line 686 still refers to ".zstd.parquet" but the code now 
uses ".zst.parquet". Update the comment to match the code: "// one changelog 
file end with ".parquet", one changelog file end with ".zst.parquet""
   ```suggestion
           // one changelog file end with ".parquet", one changelog file end 
with ".zst.parquet"
   ```



##########
paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java:
##########
@@ -638,7 +638,7 @@ public void testDataFileSuffixName() {
         Assertions.assertEquals(4, files.size());
 
         String defaultExtension = "." + "parquet";
-        String newExtension = "." + "zstd" + "." + "parquet";
+        String newExtension = "." + "zst" + "." + "parquet";
         // two data files end with ".parquet", two data file end with 
".zstd.parquet"

Review Comment:
   The comment on line 642 still refers to ".zstd.parquet" but the code now 
uses ".zst.parquet". Update the comment to match the code: "// two data files 
end with ".parquet", two data file end with ".zst.parquet""
   ```suggestion
           // two data files end with ".parquet", two data files end with 
".zst.parquet"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to