This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 87e99d129 PARQUET-2454: Invoking flush before closing the output
stream (#1309)
87e99d129 is described below
commit 87e99d12942d923f3c7f7af6a10254bd92fe8aed
Author: Asif <[email protected]>
AuthorDate: Mon Apr 22 19:40:30 2024 -0700
PARQUET-2454: Invoking flush before closing the output stream (#1309)
---
.../main/java/org/apache/parquet/cli/BaseCommand.java | 6 ++----
.../parquet/bytes/LittleEndianDataOutputStream.java | 11 +++++++----
.../parquet/io/DelegatingPositionOutputStream.java | 6 +++++-
.../java/org/apache/parquet/io/LocalOutputFile.java | 7 ++++++-
.../org/apache/parquet/hadoop/ParquetFileWriter.java | 14 +++++++-------
.../org/apache/parquet/hadoop/ParquetInputSplit.java | 19 ++++++++++---------
.../parquet/hadoop/codec/ZstdCompressorStream.java | 4 +++-
.../hadoop/util/HadoopPositionOutputStream.java | 4 +++-
8 files changed, 43 insertions(+), 28 deletions(-)
diff --git a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
index 132fa157a..b30c9432d 100644
--- a/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
+++ b/parquet-cli/src/main/java/org/apache/parquet/cli/BaseCommand.java
@@ -97,11 +97,9 @@ public abstract class BaseCommand implements Command,
Configurable {
if (filename == null || "-".equals(filename)) {
console.info(content);
} else {
- FSDataOutputStream outgoing = create(filename);
- try {
+ try (FSDataOutputStream outgoing = create(filename)) {
outgoing.write(content.getBytes(StandardCharsets.UTF_8));
- } finally {
- outgoing.close();
+ outgoing.flush();
}
}
}
diff --git
a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
index 44e6aa482..ef6c71bc8 100644
---
a/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
+++
b/parquet-common/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
@@ -20,12 +20,15 @@ package org.apache.parquet.bytes;
import java.io.IOException;
import java.io.OutputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Based on DataOutputStream but in little endian and without the String/char
methods
*/
public class LittleEndianDataOutputStream extends OutputStream {
+ private static final Logger LOG =
LoggerFactory.getLogger(LittleEndianDataOutputStream.class);
private final OutputStream out;
/**
@@ -208,10 +211,10 @@ public class LittleEndianDataOutputStream extends
OutputStream {
}
public void close() {
- try {
- out.close();
- } catch (IOException e) {
- // swallow exception
+ try (OutputStream os = this.out) {
+ os.flush();
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) LOG.debug("Exception in flushing arrayOut
before close", e);
}
}
}
diff --git
a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
index 9e524282b..c982400e5 100644
---
a/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
+++
b/parquet-common/src/main/java/org/apache/parquet/io/DelegatingPositionOutputStream.java
@@ -35,7 +35,11 @@ public abstract class DelegatingPositionOutputStream extends
PositionOutputStrea
@Override
public void close() throws IOException {
- stream.close();
+ try (OutputStream os = this.stream) {
+ os.flush();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
@Override
diff --git
a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
index 9c823f92e..695f39226 100644
--- a/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
+++ b/parquet-common/src/main/java/org/apache/parquet/io/LocalOutputFile.java
@@ -20,6 +20,7 @@ package org.apache.parquet.io;
import java.io.BufferedOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
@@ -71,7 +72,11 @@ public class LocalOutputFile implements OutputFile {
@Override
public void close() throws IOException {
- stream.close();
+ try (OutputStream os = this.stream) {
+ os.flush();
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 5344aa315..761f7a7be 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -1655,9 +1655,8 @@ public class ParquetFileWriter implements AutoCloseable {
@Override
public void close() throws IOException {
- try {
- out.close();
- } finally {
+ try (PositionOutputStream temp = out) {
+ temp.flush();
if (crcAllocator != null) {
crcAllocator.close();
}
@@ -2049,10 +2048,11 @@ public class ParquetFileWriter implements AutoCloseable
{
@Deprecated
private static void writeMetadataFile(Path outputPath, ParquetMetadata
metadataFooter, FileSystem fs)
throws IOException {
- PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
- metadata.write(MAGIC);
- serializeFooter(metadataFooter, metadata, null, new
ParquetMetadataConverter());
- metadata.close();
+ try (PositionOutputStream metadata =
HadoopStreams.wrap(fs.create(outputPath))) {
+ metadata.write(MAGIC);
+ serializeFooter(metadataFooter, metadata, null, new
ParquetMetadataConverter());
+ metadata.flush();
+ }
}
/**
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
index e851f3ed9..3c65ef7c0 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputSplit.java
@@ -266,17 +266,18 @@ public class ParquetInputSplit extends FileSplit
implements Writable {
@Override
public void write(DataOutput hout) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- DataOutputStream out = new DataOutputStream(new GZIPOutputStream(baos));
- super.write(out);
- out.writeLong(end);
- out.writeBoolean(rowGroupOffsets != null);
- if (rowGroupOffsets != null) {
- out.writeInt(rowGroupOffsets.length);
- for (long o : rowGroupOffsets) {
- out.writeLong(o);
+ try (DataOutputStream out = new DataOutputStream(new
GZIPOutputStream(baos))) {
+ super.write(out);
+ out.writeLong(end);
+ out.writeBoolean(rowGroupOffsets != null);
+ if (rowGroupOffsets != null) {
+ out.writeInt(rowGroupOffsets.length);
+ for (long o : rowGroupOffsets) {
+ out.writeLong(o);
+ }
}
+ out.flush();
}
- out.close();
writeArray(hout, baos.toByteArray());
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
index 91bdfae9d..52458f217 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/codec/ZstdCompressorStream.java
@@ -64,6 +64,8 @@ public class ZstdCompressorStream extends
CompressionOutputStream {
@Override
public void close() throws IOException {
- zstdOutputStream.close();
+ try (ZstdOutputStream zos = this.zstdOutputStream) {
+ zos.flush();
+ }
}
}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
index 157ea62e1..df22e2377 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/util/HadoopPositionOutputStream.java
@@ -61,6 +61,8 @@ public class HadoopPositionOutputStream extends
PositionOutputStream {
@Override
public void close() throws IOException {
- wrapped.close();
+ try (FSDataOutputStream fdos = wrapped) {
+ fdos.hflush();
+ }
}
}