This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-java.git
The following commit(s) were added to refs/heads/master by this push:
new 2bcd2bde7 GH-3350: Avoid flushing data to cloud when exception is
thrown (#3351)
2bcd2bde7 is described below
commit 2bcd2bde74771663d14a8429ed1dc6e5026766e6
Author: Jiayi-Wang-db <[email protected]>
AuthorDate: Mon Nov 3 18:22:34 2025 +0100
GH-3350: Avoid flushing data to cloud when exception is thrown (#3351)
---
.../hadoop/InternalParquetRecordWriter.java | 4 +
.../apache/parquet/hadoop/ParquetFileWriter.java | 865 +++++++++++----------
.../apache/parquet/hadoop/TestParquetWriter.java | 40 +
3 files changed, 517 insertions(+), 392 deletions(-)
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index f29628680..41b068d01 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -129,6 +129,7 @@ class InternalParquetRecordWriter<T> {
if (!closed) {
try {
if (aborted) {
+ parquetFileWriter.abort();
return;
}
flushRowGroupToStore();
@@ -140,6 +141,9 @@ class InternalParquetRecordWriter<T> {
}
finalMetadata.putAll(finalWriteContext.getExtraMetaData());
parquetFileWriter.end(finalMetadata);
+ } catch (Exception e) {
+ parquetFileWriter.abort();
+ throw e;
} finally {
AutoCloseables.uncheckedClose(columnStore, pageStore,
bloomFilterWriteStore, parquetFileWriter);
closed = true;
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 4d17a1d6e..82f4577b8 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -173,6 +173,7 @@ public class ParquetFileWriter implements AutoCloseable {
// set when end is called
private ParquetMetadata footer = null;
+ private boolean aborted;
private boolean closed;
private final CRC32 crc;
@@ -335,6 +336,34 @@ public class ParquetFileWriter implements AutoCloseable {
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}
+ @FunctionalInterface
+ interface IOCallable<T> {
+ T call() throws IOException;
+ }
+
+ private <T> T withAbortOnFailure(IOCallable<T> action) throws IOException {
+ try {
+ return action.call();
+ } catch (IOException e) {
+ aborted = true;
+ throw e;
+ }
+ }
+
+ @FunctionalInterface
+ interface IORunnable {
+ void run() throws IOException;
+ }
+
+ private void withAbortOnFailure(IORunnable action) throws IOException {
+ try {
+ action.run();
+ } catch (IOException e) {
+ aborted = true;
+ throw e;
+ }
+ }
+
/**
* @param file OutputFile to create or overwrite
* @param schema the schema of the data
@@ -565,13 +594,15 @@ public class ParquetFileWriter implements AutoCloseable {
* @throws IOException if there is an error while writing
*/
public void start() throws IOException {
- state = state.start();
- LOG.debug("{}: start", out.getPos());
- byte[] magic = MAGIC;
- if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
- magic = EFMAGIC;
- }
- out.write(magic);
+ withAbortOnFailure(() -> {
+ state = state.start();
+ LOG.debug("{}: start", out.getPos());
+ byte[] magic = MAGIC;
+ if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
+ magic = EFMAGIC;
+ }
+ out.write(magic);
+ });
}
public InternalFileEncryptor getEncryptor() {
@@ -585,19 +616,21 @@ public class ParquetFileWriter implements AutoCloseable {
* @throws IOException if there is an error while writing
*/
public void startBlock(long recordCount) throws IOException {
- state = state.startBlock();
- LOG.debug("{}: start block", out.getPos());
- // out.write(MAGIC); // TODO: add a magic delimiter
+ withAbortOnFailure(() -> {
+ state = state.startBlock();
+ LOG.debug("{}: start block", out.getPos());
+ // out.write(MAGIC); // TODO: add a magic delimiter
- alignment.alignForRowGroup(out);
+ alignment.alignForRowGroup(out);
- currentBlock = new BlockMetaData();
- currentRecordCount = recordCount;
+ currentBlock = new BlockMetaData();
+ currentRecordCount = recordCount;
- currentColumnIndexes = new ArrayList<>();
- currentOffsetIndexes = new ArrayList<>();
+ currentColumnIndexes = new ArrayList<>();
+ currentOffsetIndexes = new ArrayList<>();
- currentBloomFilters = new HashMap<>();
+ currentBloomFilters = new HashMap<>();
+ });
}
/**
@@ -610,28 +643,31 @@ public class ParquetFileWriter implements AutoCloseable {
*/
public void startColumn(ColumnDescriptor descriptor, long valueCount,
CompressionCodecName compressionCodecName)
throws IOException {
- state = state.startColumn();
- encodingStatsBuilder.clear();
- currentEncodings = new HashSet<Encoding>();
- currentChunkPath = ColumnPath.get(descriptor.getPath());
- currentChunkType = descriptor.getPrimitiveType();
- currentChunkCodec = compressionCodecName;
- currentChunkValueCount = valueCount;
- currentChunkFirstDataPage = -1;
- compressedLength = 0;
- uncompressedLength = 0;
- // The statistics will be copied from the first one added at
writeDataPage(s) so we have the correct typed one
- currentStatistics = null;
- currentSizeStatistics = SizeStatistics.newBuilder(
- descriptor.getPrimitiveType(),
- descriptor.getMaxRepetitionLevel(),
- descriptor.getMaxDefinitionLevel())
- .build();
- currentGeospatialStatistics =
- GeospatialStatistics.newBuilder(descriptor.getPrimitiveType()).build();
-
- columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType,
columnIndexTruncateLength);
- offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+ withAbortOnFailure(() -> {
+ state = state.startColumn();
+ encodingStatsBuilder.clear();
+ currentEncodings = new HashSet<Encoding>();
+ currentChunkPath = ColumnPath.get(descriptor.getPath());
+ currentChunkType = descriptor.getPrimitiveType();
+ currentChunkCodec = compressionCodecName;
+ currentChunkValueCount = valueCount;
+ currentChunkFirstDataPage = -1;
+ compressedLength = 0;
+ uncompressedLength = 0;
+ // The statistics will be copied from the first one added at
writeDataPage(s) so we have the correct typed
+ // one
+ currentStatistics = null;
+ currentSizeStatistics = SizeStatistics.newBuilder(
+ descriptor.getPrimitiveType(),
+ descriptor.getMaxRepetitionLevel(),
+ descriptor.getMaxDefinitionLevel())
+ .build();
+ currentGeospatialStatistics =
GeospatialStatistics.newBuilder(descriptor.getPrimitiveType())
+ .build();
+
+ columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType,
columnIndexTruncateLength);
+ offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+ });
}
/**
@@ -641,45 +677,51 @@ public class ParquetFileWriter implements AutoCloseable {
* @throws IOException if there is an error while writing
*/
public void writeDictionaryPage(DictionaryPage dictionaryPage) throws
IOException {
- writeDictionaryPage(dictionaryPage, null, null);
+ withAbortOnFailure(() -> {
+ writeDictionaryPage(dictionaryPage, null, null);
+ });
}
public void writeDictionaryPage(
DictionaryPage dictionaryPage, BlockCipher.Encryptor
headerBlockEncryptor, byte[] AAD) throws IOException {
- state = state.write();
- LOG.debug("{}: write dictionary page: {} values", out.getPos(),
dictionaryPage.getDictionarySize());
- currentChunkDictionaryPageOffset = out.getPos();
- int uncompressedSize = dictionaryPage.getUncompressedSize();
- int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size());
- if (pageWriteChecksumEnabled) {
- crc.reset();
- crcUpdate(dictionaryPage.getBytes());
- metadataConverter.writeDictionaryPageHeader(
- uncompressedSize,
- compressedPageSize,
- dictionaryPage.getDictionarySize(),
- dictionaryPage.getEncoding(),
- (int) crc.getValue(),
- out,
- headerBlockEncryptor,
- AAD);
- } else {
- metadataConverter.writeDictionaryPageHeader(
- uncompressedSize,
- compressedPageSize,
- dictionaryPage.getDictionarySize(),
- dictionaryPage.getEncoding(),
- out,
- headerBlockEncryptor,
- AAD);
- }
- long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
- this.uncompressedLength += uncompressedSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- LOG.debug("{}: write dictionary page content {}", out.getPos(),
compressedPageSize);
- dictionaryPage.getBytes().writeAllTo(out); // for encrypted column,
dictionary page bytes are already encrypted
- encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
- currentEncodings.add(dictionaryPage.getEncoding());
+ withAbortOnFailure(() -> {
+ state = state.write();
+ LOG.debug("{}: write dictionary page: {} values", out.getPos(),
dictionaryPage.getDictionarySize());
+ currentChunkDictionaryPageOffset = out.getPos();
+ int uncompressedSize = dictionaryPage.getUncompressedSize();
+ int compressedPageSize =
Math.toIntExact(dictionaryPage.getBytes().size());
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ crcUpdate(dictionaryPage.getBytes());
+ metadataConverter.writeDictionaryPageHeader(
+ uncompressedSize,
+ compressedPageSize,
+ dictionaryPage.getDictionarySize(),
+ dictionaryPage.getEncoding(),
+ (int) crc.getValue(),
+ out,
+ headerBlockEncryptor,
+ AAD);
+ } else {
+ metadataConverter.writeDictionaryPageHeader(
+ uncompressedSize,
+ compressedPageSize,
+ dictionaryPage.getDictionarySize(),
+ dictionaryPage.getEncoding(),
+ out,
+ headerBlockEncryptor,
+ AAD);
+ }
+ long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
+ this.uncompressedLength += uncompressedSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ LOG.debug("{}: write dictionary page content {}", out.getPos(),
compressedPageSize);
+ dictionaryPage
+ .getBytes()
+ .writeAllTo(out); // for encrypted column, dictionary page bytes are
already encrypted
+ encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
+ currentEncodings.add(dictionaryPage.getEncoding());
+ });
}
/**
@@ -871,22 +913,24 @@ public class ParquetFileWriter implements AutoCloseable {
byte[] pageHeaderAAD,
SizeStatistics sizeStatistics)
throws IOException {
- long beforeHeader = out.getPos();
- innerWriteDataPage(
- valueCount,
- uncompressedPageSize,
- bytes,
- statistics,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- metadataBlockEncryptor,
- pageHeaderAAD,
- sizeStatistics);
- offsetIndexBuilder.add(
- toIntWithCheck(out.getPos() - beforeHeader, "page"),
- rowCount,
- sizeStatistics != null ?
sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+ withAbortOnFailure(() -> {
+ long beforeHeader = out.getPos();
+ innerWriteDataPage(
+ valueCount,
+ uncompressedPageSize,
+ bytes,
+ statistics,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ metadataBlockEncryptor,
+ pageHeaderAAD,
+ sizeStatistics);
+ offsetIndexBuilder.add(
+ toIntWithCheck(out.getPos() - beforeHeader, "page"),
+ rowCount,
+ sizeStatistics != null ?
sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+ });
}
private void innerWriteDataPage(
@@ -978,51 +1022,53 @@ public class ParquetFileWriter implements AutoCloseable {
byte[] pageHeaderAAD,
SizeStatistics sizeStatistics)
throws IOException {
- state = state.write();
- long beforeHeader = out.getPos();
- if (currentChunkFirstDataPage < 0) {
- currentChunkFirstDataPage = beforeHeader;
- }
- LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
- int compressedPageSize = toIntWithCheck(bytes.size(), "page");
- if (pageWriteChecksumEnabled) {
- crc.reset();
- crcUpdate(bytes);
- metadataConverter.writeDataPageV1Header(
- uncompressedPageSize,
- compressedPageSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- (int) crc.getValue(),
- out,
- metadataBlockEncryptor,
- pageHeaderAAD);
- } else {
- metadataConverter.writeDataPageV1Header(
- uncompressedPageSize,
- compressedPageSize,
- valueCount,
- rlEncoding,
- dlEncoding,
- valuesEncoding,
- out,
- metadataBlockEncryptor,
- pageHeaderAAD);
- }
- long headerSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedPageSize + headerSize;
- this.compressedLength += compressedPageSize + headerSize;
- LOG.debug("{}: write data page content {}", out.getPos(),
compressedPageSize);
- bytes.writeAllTo(out);
+ withAbortOnFailure(() -> {
+ state = state.write();
+ long beforeHeader = out.getPos();
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
+ }
+ LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
+ int compressedPageSize = toIntWithCheck(bytes.size(), "page");
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ crcUpdate(bytes);
+ metadataConverter.writeDataPageV1Header(
+ uncompressedPageSize,
+ compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ (int) crc.getValue(),
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
+ } else {
+ metadataConverter.writeDataPageV1Header(
+ uncompressedPageSize,
+ compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
+ }
+ long headerSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedPageSize + headerSize;
+ this.compressedLength += compressedPageSize + headerSize;
+ LOG.debug("{}: write data page content {}", out.getPos(),
compressedPageSize);
+ bytes.writeAllTo(out);
- mergeColumnStatistics(statistics, sizeStatistics);
+ mergeColumnStatistics(statistics, sizeStatistics);
- encodingStatsBuilder.addDataEncoding(valuesEncoding);
- currentEncodings.add(rlEncoding);
- currentEncodings.add(dlEncoding);
- currentEncodings.add(valuesEncoding);
+ encodingStatsBuilder.addDataEncoding(valuesEncoding);
+ currentEncodings.add(rlEncoding);
+ currentEncodings.add(dlEncoding);
+ currentEncodings.add(valuesEncoding);
+ });
}
/**
@@ -1297,76 +1343,79 @@ public class ParquetFileWriter implements AutoCloseable
{
byte[] pageHeaderAAD,
SizeStatistics sizeStatistics)
throws IOException {
- state = state.write();
- int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page
repetition levels");
- int dlByteLength = toIntWithCheck(definitionLevels.size(), "page
definition levels");
+ withAbortOnFailure(() -> {
+ state = state.write();
+ int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page
repetition levels");
+ int dlByteLength = toIntWithCheck(definitionLevels.size(), "page
definition levels");
- int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size()
+ definitionLevels.size(), "page");
+ int compressedSize =
+ toIntWithCheck(bytes.size() + repetitionLevels.size() +
definitionLevels.size(), "page");
- int uncompressedSize =
- toIntWithCheck(uncompressedDataSize + repetitionLevels.size() +
definitionLevels.size(), "page");
+ int uncompressedSize =
+ toIntWithCheck(uncompressedDataSize + repetitionLevels.size() +
definitionLevels.size(), "page");
- long beforeHeader = out.getPos();
- if (currentChunkFirstDataPage < 0) {
- currentChunkFirstDataPage = beforeHeader;
- }
-
- if (pageWriteChecksumEnabled) {
- crc.reset();
- if (repetitionLevels.size() > 0) {
- crcUpdate(repetitionLevels);
- }
- if (definitionLevels.size() > 0) {
- crcUpdate(definitionLevels);
+ long beforeHeader = out.getPos();
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
}
- if (bytes.size() > 0) {
- crcUpdate(bytes);
+
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ if (repetitionLevels.size() > 0) {
+ crcUpdate(repetitionLevels);
+ }
+ if (definitionLevels.size() > 0) {
+ crcUpdate(definitionLevels);
+ }
+ if (bytes.size() > 0) {
+ crcUpdate(bytes);
+ }
+ metadataConverter.writeDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ compressed,
+ (int) crc.getValue(),
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
+ } else {
+ metadataConverter.writeDataPageV2Header(
+ uncompressedSize,
+ compressedSize,
+ valueCount,
+ nullCount,
+ rowCount,
+ dataEncoding,
+ rlByteLength,
+ dlByteLength,
+ compressed,
+ out,
+ metadataBlockEncryptor,
+ pageHeaderAAD);
}
- metadataConverter.writeDataPageV2Header(
- uncompressedSize,
- compressedSize,
- valueCount,
- nullCount,
- rowCount,
- dataEncoding,
- rlByteLength,
- dlByteLength,
- compressed,
- (int) crc.getValue(),
- out,
- metadataBlockEncryptor,
- pageHeaderAAD);
- } else {
- metadataConverter.writeDataPageV2Header(
- uncompressedSize,
- compressedSize,
- valueCount,
- nullCount,
- rowCount,
- dataEncoding,
- rlByteLength,
- dlByteLength,
- compressed,
- out,
- metadataBlockEncryptor,
- pageHeaderAAD);
- }
- long headersSize = out.getPos() - beforeHeader;
- this.uncompressedLength += uncompressedSize + headersSize;
- this.compressedLength += compressedSize + headersSize;
+ long headersSize = out.getPos() - beforeHeader;
+ this.uncompressedLength += uncompressedSize + headersSize;
+ this.compressedLength += compressedSize + headersSize;
- mergeColumnStatistics(statistics, sizeStatistics);
+ mergeColumnStatistics(statistics, sizeStatistics);
- currentEncodings.add(dataEncoding);
- encodingStatsBuilder.addDataEncoding(dataEncoding);
+ currentEncodings.add(dataEncoding);
+ encodingStatsBuilder.addDataEncoding(dataEncoding);
- BytesInput.concat(repetitionLevels, definitionLevels,
bytes).writeAllTo(out);
+ BytesInput.concat(repetitionLevels, definitionLevels,
bytes).writeAllTo(out);
- offsetIndexBuilder.add(
- toIntWithCheck(out.getPos() - beforeHeader, "page"),
- rowCount,
- sizeStatistics != null ?
sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+ offsetIndexBuilder.add(
+ toIntWithCheck(out.getPos() - beforeHeader, "page"),
+ rowCount,
+ sizeStatistics != null ?
sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+ });
}
private void crcUpdate(BytesInput bytes) {
@@ -1457,58 +1506,61 @@ public class ParquetFileWriter implements AutoCloseable
{
int columnOrdinal,
byte[] fileAAD)
throws IOException {
- startColumn(descriptor, valueCount, compressionCodecName);
-
- state = state.write();
- if (dictionaryPage != null) {
- byte[] dictonaryPageHeaderAAD = null;
- if (null != headerBlockEncryptor) {
- dictonaryPageHeaderAAD = AesCipher.createModuleAAD(
- fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal,
columnOrdinal, -1);
+ withAbortOnFailure(() -> {
+ startColumn(descriptor, valueCount, compressionCodecName);
+
+ state = state.write();
+ if (dictionaryPage != null) {
+ byte[] dictonaryPageHeaderAAD = null;
+ if (null != headerBlockEncryptor) {
+ dictonaryPageHeaderAAD = AesCipher.createModuleAAD(
+ fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal,
columnOrdinal, -1);
+ }
+ writeDictionaryPage(dictionaryPage, headerBlockEncryptor,
dictonaryPageHeaderAAD);
}
- writeDictionaryPage(dictionaryPage, headerBlockEncryptor,
dictonaryPageHeaderAAD);
- }
- if (bloomFilter != null) {
- // write bloom filter if one of data pages is not dictionary encoded
- boolean isWriteBloomFilter = false;
- for (Encoding encoding : dataEncodings) {
- // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1,
`RLE_DICTIONARY` is used in parquet v2
- if (encoding != Encoding.PLAIN_DICTIONARY && encoding !=
Encoding.RLE_DICTIONARY) {
- isWriteBloomFilter = true;
- break;
+ if (bloomFilter != null) {
+ // write bloom filter if one of data pages is not dictionary encoded
+ boolean isWriteBloomFilter = false;
+ for (Encoding encoding : dataEncodings) {
+ // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1,
`RLE_DICTIONARY` is used in
+ // parquet v2
+ if (encoding != Encoding.PLAIN_DICTIONARY && encoding !=
Encoding.RLE_DICTIONARY) {
+ isWriteBloomFilter = true;
+ break;
+ }
+ }
+ if (isWriteBloomFilter) {
+ currentBloomFilters.put(String.join(".", descriptor.getPath()),
bloomFilter);
+ } else {
+ LOG.info(
+ "No need to write bloom filter because column {} data pages are
all encoded as dictionary.",
+ descriptor.getPath());
}
}
- if (isWriteBloomFilter) {
- currentBloomFilters.put(String.join(".", descriptor.getPath()),
bloomFilter);
- } else {
- LOG.info(
- "No need to write bloom filter because column {} data pages are
all encoded as dictionary.",
- descriptor.getPath());
+ LOG.debug("{}: write data pages", out.getPos());
+ long headersSize = bytes.size() - compressedTotalPageSize;
+ this.uncompressedLength += uncompressedTotalPageSize + headersSize;
+ this.compressedLength += compressedTotalPageSize + headersSize;
+ LOG.debug("{}: write data pages content", out.getPos());
+ currentChunkFirstDataPage = out.getPos();
+ bytes.writeAllTo(out);
+ encodingStatsBuilder.addDataEncodings(dataEncodings);
+ if (rlEncodings.isEmpty()) {
+ encodingStatsBuilder.withV2Pages();
}
- }
- LOG.debug("{}: write data pages", out.getPos());
- long headersSize = bytes.size() - compressedTotalPageSize;
- this.uncompressedLength += uncompressedTotalPageSize + headersSize;
- this.compressedLength += compressedTotalPageSize + headersSize;
- LOG.debug("{}: write data pages content", out.getPos());
- currentChunkFirstDataPage = out.getPos();
- bytes.writeAllTo(out);
- encodingStatsBuilder.addDataEncodings(dataEncodings);
- if (rlEncodings.isEmpty()) {
- encodingStatsBuilder.withV2Pages();
- }
- currentEncodings.addAll(rlEncodings);
- currentEncodings.addAll(dlEncodings);
- currentEncodings.addAll(dataEncodings);
- currentStatistics = totalStats;
- currentSizeStatistics = totalSizeStats;
- currentGeospatialStatistics = totalGeospatialStats;
+ currentEncodings.addAll(rlEncodings);
+ currentEncodings.addAll(dlEncodings);
+ currentEncodings.addAll(dataEncodings);
+ currentStatistics = totalStats;
+ currentSizeStatistics = totalSizeStats;
+ currentGeospatialStatistics = totalGeospatialStats;
- this.columnIndexBuilder = columnIndexBuilder;
- this.offsetIndexBuilder = offsetIndexBuilder;
+ this.columnIndexBuilder = columnIndexBuilder;
+ this.offsetIndexBuilder = offsetIndexBuilder;
- endColumn();
+ endColumn();
+ });
}
/**
@@ -1530,34 +1582,36 @@ public class ParquetFileWriter implements AutoCloseable
{
* @throws IOException if there is an error while writing
*/
public void endColumn() throws IOException {
- state = state.endColumn();
- LOG.debug("{}: end column", out.getPos());
- if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount()
* MAX_STATS_SIZE) {
- currentColumnIndexes.add(null);
- } else {
- currentColumnIndexes.add(columnIndexBuilder.build());
- }
-
currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
- currentBlock.addColumn(ColumnChunkMetaData.get(
- currentChunkPath,
- currentChunkType,
- currentChunkCodec,
- encodingStatsBuilder.build(),
- currentEncodings,
- currentStatistics,
- currentChunkFirstDataPage,
- currentChunkDictionaryPageOffset,
- currentChunkValueCount,
- compressedLength,
- uncompressedLength,
- currentSizeStatistics,
- currentGeospatialStatistics));
- this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() +
uncompressedLength);
- this.uncompressedLength = 0;
- this.compressedLength = 0;
- this.currentChunkDictionaryPageOffset = 0;
- columnIndexBuilder = null;
- offsetIndexBuilder = null;
+ withAbortOnFailure(() -> {
+ state = state.endColumn();
+ LOG.debug("{}: end column", out.getPos());
+ if (columnIndexBuilder.getMinMaxSize() >
columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) {
+ currentColumnIndexes.add(null);
+ } else {
+ currentColumnIndexes.add(columnIndexBuilder.build());
+ }
+
currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ currentChunkPath,
+ currentChunkType,
+ currentChunkCodec,
+ encodingStatsBuilder.build(),
+ currentEncodings,
+ currentStatistics,
+ currentChunkFirstDataPage,
+ currentChunkDictionaryPageOffset,
+ currentChunkValueCount,
+ compressedLength,
+ uncompressedLength,
+ currentSizeStatistics,
+ currentGeospatialStatistics));
+ this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() +
uncompressedLength);
+ this.uncompressedLength = 0;
+ this.compressedLength = 0;
+ this.currentChunkDictionaryPageOffset = 0;
+ columnIndexBuilder = null;
+ offsetIndexBuilder = null;
+ });
}
/**
@@ -1566,22 +1620,24 @@ public class ParquetFileWriter implements AutoCloseable
{
* @throws IOException if there is an error while writing
*/
public void endBlock() throws IOException {
- if (currentRecordCount == 0) {
- throw new ParquetEncodingException("End block with zero record");
- }
+ withAbortOnFailure(() -> {
+ if (currentRecordCount == 0) {
+ throw new ParquetEncodingException("End block with zero record");
+ }
- state = state.endBlock();
- LOG.debug("{}: end block", out.getPos());
- currentBlock.setRowCount(currentRecordCount);
- currentBlock.setOrdinal(blocks.size());
- blocks.add(currentBlock);
- columnIndexes.add(currentColumnIndexes);
- offsetIndexes.add(currentOffsetIndexes);
- bloomFilters.add(currentBloomFilters);
- currentColumnIndexes = null;
- currentOffsetIndexes = null;
- currentBloomFilters = null;
- currentBlock = null;
+ state = state.endBlock();
+ LOG.debug("{}: end block", out.getPos());
+ currentBlock.setRowCount(currentRecordCount);
+ currentBlock.setOrdinal(blocks.size());
+ blocks.add(currentBlock);
+ columnIndexes.add(currentColumnIndexes);
+ offsetIndexes.add(currentOffsetIndexes);
+ bloomFilters.add(currentBloomFilters);
+ currentColumnIndexes = null;
+ currentOffsetIndexes = null;
+ currentBloomFilters = null;
+ currentBlock = null;
+ });
}
/**
@@ -1598,9 +1654,11 @@ public class ParquetFileWriter implements AutoCloseable {
}
public void appendFile(InputFile file) throws IOException {
- try (ParquetFileReader reader = ParquetFileReader.open(file)) {
- reader.appendTo(this);
- }
+ withAbortOnFailure(() -> {
+ try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+ reader.appendTo(this);
+ }
+ });
}
/**
@@ -1619,9 +1677,11 @@ public class ParquetFileWriter implements AutoCloseable {
public void appendRowGroups(SeekableInputStream file, List<BlockMetaData>
rowGroups, boolean dropColumns)
throws IOException {
- for (BlockMetaData block : rowGroups) {
- appendRowGroup(file, block, dropColumns);
- }
+ withAbortOnFailure(() -> {
+ for (BlockMetaData block : rowGroups) {
+ appendRowGroup(file, block, dropColumns);
+ }
+ });
}
/**
@@ -1639,83 +1699,86 @@ public class ParquetFileWriter implements AutoCloseable
{
public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
boolean dropColumns)
throws IOException {
- startBlock(rowGroup.getRowCount());
-
- Map<String, ColumnChunkMetaData> columnsToCopy = new HashMap<String,
ColumnChunkMetaData>();
- for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
- columnsToCopy.put(chunk.getPath().toDotString(), chunk);
- }
+ withAbortOnFailure(() -> {
+ startBlock(rowGroup.getRowCount());
- List<ColumnChunkMetaData> columnsInOrder = new
ArrayList<ColumnChunkMetaData>();
-
- for (ColumnDescriptor descriptor : schema.getColumns()) {
- String path = ColumnPath.get(descriptor.getPath()).toDotString();
- ColumnChunkMetaData chunk = columnsToCopy.remove(path);
- if (chunk != null) {
- columnsInOrder.add(chunk);
- } else {
- throw new IllegalArgumentException(
- String.format("Missing column '%s', cannot copy row group: %s",
path, rowGroup));
+ Map<String, ColumnChunkMetaData> columnsToCopy = new HashMap<String,
ColumnChunkMetaData>();
+ for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
+ columnsToCopy.put(chunk.getPath().toDotString(), chunk);
}
- }
- // complain if some columns would be dropped and that's not okay
- if (!dropColumns && !columnsToCopy.isEmpty()) {
- throw new IllegalArgumentException(String.format(
- "Columns cannot be copied (missing from target schema): %s",
- String.join(", ", columnsToCopy.keySet())));
- }
-
- // copy the data for all chunks
- long start = -1;
- long length = 0;
- long blockUncompressedSize = 0L;
- for (int i = 0; i < columnsInOrder.size(); i += 1) {
- ColumnChunkMetaData chunk = columnsInOrder.get(i);
-
- // get this chunk's start position in the new file
- long newChunkStart = out.getPos() + length;
+ List<ColumnChunkMetaData> columnsInOrder = new
ArrayList<ColumnChunkMetaData>();
- // add this chunk to be copied with any previous chunks
- if (start < 0) {
- // no previous chunk included, start at this chunk's starting pos
- start = chunk.getStartingPos();
+ for (ColumnDescriptor descriptor : schema.getColumns()) {
+ String path = ColumnPath.get(descriptor.getPath()).toDotString();
+ ColumnChunkMetaData chunk = columnsToCopy.remove(path);
+ if (chunk != null) {
+ columnsInOrder.add(chunk);
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Missing column '%s', cannot copy row group: %s",
path, rowGroup));
+ }
}
- length += chunk.getTotalSize();
-
- if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i +
1).getStartingPos() != (start + length)) {
- // not contiguous. do the copy now.
- copy(from, out, start, length);
- // reset to start at the next column chunk
- start = -1;
- length = 0;
+
+ // complain if some columns would be dropped and that's not okay
+ if (!dropColumns && !columnsToCopy.isEmpty()) {
+ throw new IllegalArgumentException(String.format(
+ "Columns cannot be copied (missing from target schema): %s",
+ String.join(", ", columnsToCopy.keySet())));
}
- // TODO: column/offset indexes are not copied
- // (it would require seeking to the end of the file for each row groups)
- currentColumnIndexes.add(null);
- currentOffsetIndexes.add(null);
+ // copy the data for all chunks
+ long start = -1;
+ long length = 0;
+ long blockUncompressedSize = 0L;
+ for (int i = 0; i < columnsInOrder.size(); i += 1) {
+ ColumnChunkMetaData chunk = columnsInOrder.get(i);
- Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
- currentBlock.addColumn(ColumnChunkMetaData.get(
- chunk.getPath(),
- chunk.getPrimitiveType(),
- chunk.getCodec(),
- chunk.getEncodingStats(),
- chunk.getEncodings(),
- chunk.getStatistics(),
- offsets.firstDataPageOffset,
- offsets.dictionaryPageOffset,
- chunk.getValueCount(),
- chunk.getTotalSize(),
- chunk.getTotalUncompressedSize()));
+ // get this chunk's start position in the new file
+ long newChunkStart = out.getPos() + length;
- blockUncompressedSize += chunk.getTotalUncompressedSize();
- }
+ // add this chunk to be copied with any previous chunks
+ if (start < 0) {
+ // no previous chunk included, start at this chunk's starting pos
+ start = chunk.getStartingPos();
+ }
+ length += chunk.getTotalSize();
+
+ if ((i + 1) == columnsInOrder.size()
+ || columnsInOrder.get(i + 1).getStartingPos() != (start + length))
{
+ // not contiguous. do the copy now.
+ copy(from, out, start, length);
+ // reset to start at the next column chunk
+ start = -1;
+ length = 0;
+ }
- currentBlock.setTotalByteSize(blockUncompressedSize);
+ // TODO: column/offset indexes are not copied
+ // (it would require seeking to the end of the file for each row
groups)
+ currentColumnIndexes.add(null);
+ currentOffsetIndexes.add(null);
+
+ Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ chunk.getPath(),
+ chunk.getPrimitiveType(),
+ chunk.getCodec(),
+ chunk.getEncodingStats(),
+ chunk.getEncodings(),
+ chunk.getStatistics(),
+ offsets.firstDataPageOffset,
+ offsets.dictionaryPageOffset,
+ chunk.getValueCount(),
+ chunk.getTotalSize(),
+ chunk.getTotalUncompressedSize()));
+
+ blockUncompressedSize += chunk.getTotalUncompressedSize();
+ }
- endBlock();
+ currentBlock.setTotalByteSize(blockUncompressedSize);
+
+ endBlock();
+ });
}
/**
@@ -1735,36 +1798,41 @@ public class ParquetFileWriter implements AutoCloseable
{
ColumnIndex columnIndex,
OffsetIndex offsetIndex)
throws IOException {
- long start = chunk.getStartingPos();
- long length = chunk.getTotalSize();
- long newChunkStart = out.getPos();
+ withAbortOnFailure(() -> {
+ long start = chunk.getStartingPos();
+ long length = chunk.getTotalSize();
+ long newChunkStart = out.getPos();
- if (offsetIndex != null && newChunkStart != start) {
- offsetIndex =
-
OffsetIndexBuilder.getBuilder().fromOffsetIndex(offsetIndex).build(newChunkStart
- start);
- }
+ OffsetIndex effectiveOffsetIndex = offsetIndex;
- copy(from, out, start, length);
+ if (effectiveOffsetIndex != null && newChunkStart != start) {
+ effectiveOffsetIndex = OffsetIndexBuilder.getBuilder()
+ .fromOffsetIndex(effectiveOffsetIndex)
+ .build(newChunkStart - start);
+ }
- currentBloomFilters.put(String.join(".", descriptor.getPath()),
bloomFilter);
- currentColumnIndexes.add(columnIndex);
- currentOffsetIndexes.add(offsetIndex);
+ copy(from, out, start, length);
- Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
- currentBlock.addColumn(ColumnChunkMetaData.get(
- chunk.getPath(),
- chunk.getPrimitiveType(),
- chunk.getCodec(),
- chunk.getEncodingStats(),
- chunk.getEncodings(),
- chunk.getStatistics(),
- offsets.firstDataPageOffset,
- offsets.dictionaryPageOffset,
- chunk.getValueCount(),
- chunk.getTotalSize(),
- chunk.getTotalUncompressedSize()));
+ currentBloomFilters.put(String.join(".", descriptor.getPath()),
bloomFilter);
+ currentColumnIndexes.add(columnIndex);
+ currentOffsetIndexes.add(effectiveOffsetIndex);
+
+ Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+ currentBlock.addColumn(ColumnChunkMetaData.get(
+ chunk.getPath(),
+ chunk.getPrimitiveType(),
+ chunk.getCodec(),
+ chunk.getEncodingStats(),
+ chunk.getEncodings(),
+ chunk.getStatistics(),
+ offsets.firstDataPageOffset,
+ offsets.dictionaryPageOffset,
+ chunk.getValueCount(),
+ chunk.getTotalSize(),
+ chunk.getTotalUncompressedSize()));
- currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() +
chunk.getTotalUncompressedSize());
+ currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() +
chunk.getTotalUncompressedSize());
+ });
}
// Buffers for the copy function.
@@ -1804,17 +1872,25 @@ public class ParquetFileWriter implements AutoCloseable
{
* @throws IOException if there is an error while writing
*/
public void end(Map<String, String> extraMetaData) throws IOException {
- try {
- state = state.end();
- serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
- serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
- serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
- LOG.debug("{}: end", out.getPos());
- this.footer = new ParquetMetadata(new FileMetaData(schema,
extraMetaData, Version.FULL_VERSION), blocks);
- serializeFooter(footer, out, fileEncryptor, metadataConverter);
- } finally {
- close();
- }
+ withAbortOnFailure(() -> {
+ try {
+ state = state.end();
+ serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
+ serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
+ serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
+ LOG.debug("{}: end", out.getPos());
+ this.footer =
+ new ParquetMetadata(new FileMetaData(schema, extraMetaData,
Version.FULL_VERSION), blocks);
+ serializeFooter(footer, out, fileEncryptor, metadataConverter);
+ } finally {
+ close();
+ }
+ });
+ }
+
+ /* Mark the writer as aborted to avoid flushing incomplete data. */
+ public void abort() {
+ aborted = true;
}
@Override
@@ -1822,8 +1898,13 @@ public class ParquetFileWriter implements AutoCloseable {
if (closed) {
return;
}
- try (PositionOutputStream temp = out) {
- temp.flush();
+
+ try {
+ if (!aborted) {
+ try (PositionOutputStream temp = out) {
+ temp.flush();
+ }
+ }
if (crcAllocator != null) {
crcAllocator.close();
}
@@ -2274,11 +2355,11 @@ public class ParquetFileWriter implements AutoCloseable
{
* @throws IOException if there is an error while getting the current
stream's position
*/
public long getPos() throws IOException {
- return out.getPos();
+ return withAbortOnFailure(() -> out.getPos());
}
public long getNextRowGroupSize() throws IOException {
- return alignment.nextRowGroupSize(out);
+ return withAbortOnFailure(() -> alignment.nextRowGroupSize(out));
}
/**
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index d73079c92..9a69ee478 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -44,6 +44,7 @@ import static org.junit.Assert.assertTrue;
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -52,6 +53,7 @@ import java.util.concurrent.Callable;
import net.openhft.hashing.LongHashFunction;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
@@ -780,4 +782,42 @@ public class TestParquetWriter {
"Cannot set both path and file", IllegalStateException.class,
(Callable<ParquetWriter<Group>>) () ->
ExampleParquetWriter.builder(path).withFile(outputFile).build());
}
+
+ @Test
+ public void testNoFlushAfterException() throws Exception {
+ final File testDir = temp.newFile();
+ testDir.delete();
+
+ final Path file = new Path(testDir.getAbsolutePath(), "test.parquet");
+
+ MessageType schema = Types.buildMessage()
+ .required(BINARY)
+ .named("binary_field")
+ .required(INT32)
+ .named("int32_field")
+ .named("test_schema_abort");
+ Configuration conf = new Configuration();
+
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new
Path(file.toString()))
+ .withAllocator(allocator)
+ .withType(schema)
+ .build()) {
+
+ SimpleGroupFactory f = new SimpleGroupFactory(schema);
+ writer.write(f.newGroup().append("binary_field",
"hello").append("int32_field", 123));
+
+ Field internalWriterField =
ParquetWriter.class.getDeclaredField("writer");
+ internalWriterField.setAccessible(true);
+ Object internalWriter = internalWriterField.get(writer);
+
+ Field abortedField =
internalWriter.getClass().getDeclaredField("aborted");
+ abortedField.setAccessible(true);
+ abortedField.setBoolean(internalWriter, true);
+ writer.close();
+ }
+
+ // After closing, check whether file exists or is empty
+ FileSystem fs = file.getFileSystem(conf);
+ assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0);
+ }
}