This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 87e99d129 PARQUET-2454: Invoking flush before closing the output 
stream (#1309)
87e99d129 is described below

commit 87e99d12942d923f3c7f7af6a10254bd92fe8aed
Author: Asif <[email protected]>
AuthorDate: Mon Apr 22 19:40:30 2024 -0700

    PARQUET-2454: Invoking flush before closing the output stream (#1309)
---
 .../main/java/org/apache/parquet/cli/BaseCommand.java |  6 ++----
 .../parquet/bytes/LittleEndianDataOutputStream.java   | 11 +++++++----
 .../parquet/io/DelegatingPositionOutputStream.java    |  6 +++++-
 .../java/org/apache/parquet/io/LocalOutputFile.java   |  7 ++++++-
 .../org/apache/parquet/hadoop/ParquetFileWriter.java  | 14 +++++++-------
 .../org/apache/parquet/hadoop/ParquetInputSplit.java  | 19 ++++++++++---------
 .../parquet/hadoop/codec/ZstdCompressorStream.java    |  4 +++-
 .../hadoop/util/HadoopPositionOutputStream.java       |  4 +++-
 8 files changed, 43 insertions(+), 28 deletions(-)

diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java 
b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
index 132fa157a..b30c9432d 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -97,11 +97,9 @@ public abstract class BaseCommand implements Command, 
Configurable {
     if (filename == null || "-".equals(filename)) {
       console.info(content);
     } else {
-      FSDataOutputStream outgoing = create(filename);
-      try {
+      try (FSDataOutputStream outgoing = create(filename)) {
         outgoing.write(content.getBytes(StandardCharsets.UTF_8));
-      } finally {
-        outgoing.close();
+        outgoing.flush();
       }
     }
   }
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
index 44e6aa482..ef6c71bc8 100644
--- 
a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
+++ 
b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
@@ -20,12 +20,15 @@ package org.apache.parquet.bytes;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Based on DataOutputStream but in little endian and without the String/char 
methods
  */
 public class LittleEndianDataOutputStream extends OutputStream {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(LittleEndianDataOutputStream.class);
   private final OutputStream out;
 
   /**
@@ -208,10 +211,10 @@ public class LittleEndianDataOutputStream extends 
OutputStream {
   }
 
   public void close() {
-    try {
-      out.close();
-    } catch (IOException e) {
-      // swallow exception
+    try (OutputStream os = this.out) {
+      os.flush();
+    } catch (Exception e) {
+      if (LOG.isDebugEnabled()) LOG.debug("Exception in flushing arrayOut 
before close", e);
     }
   }
 }
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
 
b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
index 9e524282b..c982400e5 100644
--- 
a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
+++ 
b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
@@ -35,7 +35,11 @@ public abstract class DelegatingPositionOutputStream extends 
PositionOutputStrea
 
   @Override
   public void close() throws IOException {
-    stream.close();
+    try (OutputStream os = this.stream) {
+      os.flush();
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
   }
 
   @Override
diff --git 
a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java 
b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
index 9c823f92e..695f39226 100644
--- a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
+++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
@@ -20,6 +20,7 @@ package org.apache.parquet.io;
 
 import java.io.BufferedOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -71,7 +72,11 @@ public class LocalOutputFile implements OutputFile {
 
     @Override
     public void close() throws IOException {
-      stream.close();
+      try (OutputStream os = this.stream) {
+        os.flush();
+      } catch (Exception e) {
+        throw new IOException(e);
+      }
     }
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 5344aa315..761f7a7be 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1655,9 +1655,8 @@ public class ParquetFileWriter implements AutoCloseable {
 
   @Override
   public void close() throws IOException {
-    try {
-      out.close();
-    } finally {
+    try (PositionOutputStream temp = out) {
+      temp.flush();
       if (crcAllocator != null) {
         crcAllocator.close();
       }
@@ -2049,10 +2048,11 @@ public class ParquetFileWriter implements AutoCloseable 
{
   @Deprecated
   private static void writeMetadataFile(Path outputPath, ParquetMetadata 
metadataFooter, FileSystem fs)
       throws IOException {
-    PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
-    metadata.write(MAGIC);
-    serializeFooter(metadataFooter, metadata, null, new 
ParquetMetadataConverter());
-    metadata.close();
+    try (PositionOutputStream metadata = 
HadoopStreams.wrap(fs.create(outputPath))) {
+      metadata.write(MAGIC);
+      serializeFooter(metadataFooter, metadata, null, new 
ParquetMetadataConverter());
+      metadata.flush();
+    }
   }
 
   /**
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
index e851f3ed9..3c65ef7c0 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
@@ -266,17 +266,18 @@ public class ParquetInputSplit extends FileSplit 
implements Writable {
   @Override
   public void write(DataOutput hout) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    DataOutputStream out = new DataOutputStream(new GZIPOutputStream(baos));
-    super.write(out);
-    out.writeLong(end);
-    out.writeBoolean(rowGroupOffsets != null);
-    if (rowGroupOffsets != null) {
-      out.writeInt(rowGroupOffsets.length);
-      for (long o : rowGroupOffsets) {
-        out.writeLong(o);
+    try (DataOutputStream out = new DataOutputStream(new 
GZIPOutputStream(baos))) {
+      super.write(out);
+      out.writeLong(end);
+      out.writeBoolean(rowGroupOffsets != null);
+      if (rowGroupOffsets != null) {
+        out.writeInt(rowGroupOffsets.length);
+        for (long o : rowGroupOffsets) {
+          out.writeLong(o);
+        }
       }
+      out.flush();
     }
-    out.close();
     writeArray(hout, baos.toByteArray());
   }
 
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
index 91bdfae9d..52458f217 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
@@ -64,6 +64,8 @@ public class ZstdCompressorStream extends 
CompressionOutputStream {
 
   @Override
   public void close() throws IOException {
-    zstdOutputStream.close();
+    try (ZstdOutputStream zos = this.zstdOutputStream) {
+      zos.flush();
+    }
   }
 }
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
index 157ea62e1..df22e2377 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
@@ -61,6 +61,8 @@ public class HadoopPositionOutputStream extends 
PositionOutputStream {
 
   @Override
   public void close() throws IOException {
-    wrapped.close();
+    try (FSDataOutputStream fdos = wrapped) {
+      fdos.hflush();
+    }
   }
 }

Reply via email to