This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch parquet-1.16.x
in repository https://gitbox.apache.org/repos/asf/parquet-java.git


The following commit(s) were added to refs/heads/parquet-1.16.x by this push:
     new 3347f6d0a GH-3350: Avoid flushing data to cloud when exception is 
thrown (#3351)
3347f6d0a is described below

commit 3347f6d0ace2ac0d804e39a1df43992e309c3aa2
Author: Jiayi-Wang-db <[email protected]>
AuthorDate: Mon Nov 3 18:22:34 2025 +0100

    GH-3350: Avoid flushing data to cloud when exception is thrown (#3351)
    
    (cherry picked from commit 2bcd2bde74771663d14a8429ed1dc6e5026766e6)
---
 .../hadoop/InternalParquetRecordWriter.java        |   4 +
 .../apache/parquet/hadoop/ParquetFileWriter.java   | 865 +++++++++++----------
 .../apache/parquet/hadoop/TestParquetWriter.java   |  40 +
 3 files changed, 517 insertions(+), 392 deletions(-)

diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index f29628680..41b068d01 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -129,6 +129,7 @@ class InternalParquetRecordWriter<T> {
     if (!closed) {
       try {
         if (aborted) {
+          parquetFileWriter.abort();
           return;
         }
         flushRowGroupToStore();
@@ -140,6 +141,9 @@ class InternalParquetRecordWriter<T> {
         }
         finalMetadata.putAll(finalWriteContext.getExtraMetaData());
         parquetFileWriter.end(finalMetadata);
+      } catch (Exception e) {
+        parquetFileWriter.abort();
+        throw e;
       } finally {
         AutoCloseables.uncheckedClose(columnStore, pageStore, 
bloomFilterWriteStore, parquetFileWriter);
         closed = true;
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 4d17a1d6e..82f4577b8 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -173,6 +173,7 @@ public class ParquetFileWriter implements AutoCloseable {
 
   // set when end is called
   private ParquetMetadata footer = null;
+  private boolean aborted;
   private boolean closed;
 
   private final CRC32 crc;
@@ -335,6 +336,34 @@ public class ParquetFileWriter implements AutoCloseable {
         ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
   }
 
+  @FunctionalInterface
+  interface IOCallable<T> {
+    T call() throws IOException;
+  }
+
+  private <T> T withAbortOnFailure(IOCallable<T> action) throws IOException {
+    try {
+      return action.call();
+    } catch (IOException e) {
+      aborted = true;
+      throw e;
+    }
+  }
+
+  @FunctionalInterface
+  interface IORunnable {
+    void run() throws IOException;
+  }
+
+  private void withAbortOnFailure(IORunnable action) throws IOException {
+    try {
+      action.run();
+    } catch (IOException e) {
+      aborted = true;
+      throw e;
+    }
+  }
+
   /**
    * @param file                      OutputFile to create or overwrite
    * @param schema                    the schema of the data
@@ -565,13 +594,15 @@ public class ParquetFileWriter implements AutoCloseable {
    * @throws IOException if there is an error while writing
    */
   public void start() throws IOException {
-    state = state.start();
-    LOG.debug("{}: start", out.getPos());
-    byte[] magic = MAGIC;
-    if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
-      magic = EFMAGIC;
-    }
-    out.write(magic);
+    withAbortOnFailure(() -> {
+      state = state.start();
+      LOG.debug("{}: start", out.getPos());
+      byte[] magic = MAGIC;
+      if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
+        magic = EFMAGIC;
+      }
+      out.write(magic);
+    });
   }
 
   public InternalFileEncryptor getEncryptor() {
@@ -585,19 +616,21 @@ public class ParquetFileWriter implements AutoCloseable {
    * @throws IOException if there is an error while writing
    */
   public void startBlock(long recordCount) throws IOException {
-    state = state.startBlock();
-    LOG.debug("{}: start block", out.getPos());
-    //    out.write(MAGIC); // TODO: add a magic delimiter
+    withAbortOnFailure(() -> {
+      state = state.startBlock();
+      LOG.debug("{}: start block", out.getPos());
+      //    out.write(MAGIC); // TODO: add a magic delimiter
 
-    alignment.alignForRowGroup(out);
+      alignment.alignForRowGroup(out);
 
-    currentBlock = new BlockMetaData();
-    currentRecordCount = recordCount;
+      currentBlock = new BlockMetaData();
+      currentRecordCount = recordCount;
 
-    currentColumnIndexes = new ArrayList<>();
-    currentOffsetIndexes = new ArrayList<>();
+      currentColumnIndexes = new ArrayList<>();
+      currentOffsetIndexes = new ArrayList<>();
 
-    currentBloomFilters = new HashMap<>();
+      currentBloomFilters = new HashMap<>();
+    });
   }
 
   /**
@@ -610,28 +643,31 @@ public class ParquetFileWriter implements AutoCloseable {
    */
   public void startColumn(ColumnDescriptor descriptor, long valueCount, 
CompressionCodecName compressionCodecName)
       throws IOException {
-    state = state.startColumn();
-    encodingStatsBuilder.clear();
-    currentEncodings = new HashSet<Encoding>();
-    currentChunkPath = ColumnPath.get(descriptor.getPath());
-    currentChunkType = descriptor.getPrimitiveType();
-    currentChunkCodec = compressionCodecName;
-    currentChunkValueCount = valueCount;
-    currentChunkFirstDataPage = -1;
-    compressedLength = 0;
-    uncompressedLength = 0;
-    // The statistics will be copied from the first one added at 
writeDataPage(s) so we have the correct typed one
-    currentStatistics = null;
-    currentSizeStatistics = SizeStatistics.newBuilder(
-            descriptor.getPrimitiveType(),
-            descriptor.getMaxRepetitionLevel(),
-            descriptor.getMaxDefinitionLevel())
-        .build();
-    currentGeospatialStatistics =
-        GeospatialStatistics.newBuilder(descriptor.getPrimitiveType()).build();
-
-    columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, 
columnIndexTruncateLength);
-    offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+    withAbortOnFailure(() -> {
+      state = state.startColumn();
+      encodingStatsBuilder.clear();
+      currentEncodings = new HashSet<Encoding>();
+      currentChunkPath = ColumnPath.get(descriptor.getPath());
+      currentChunkType = descriptor.getPrimitiveType();
+      currentChunkCodec = compressionCodecName;
+      currentChunkValueCount = valueCount;
+      currentChunkFirstDataPage = -1;
+      compressedLength = 0;
+      uncompressedLength = 0;
+      // The statistics will be copied from the first one added at 
writeDataPage(s) so we have the correct typed
+      // one
+      currentStatistics = null;
+      currentSizeStatistics = SizeStatistics.newBuilder(
+              descriptor.getPrimitiveType(),
+              descriptor.getMaxRepetitionLevel(),
+              descriptor.getMaxDefinitionLevel())
+          .build();
+      currentGeospatialStatistics = 
GeospatialStatistics.newBuilder(descriptor.getPrimitiveType())
+          .build();
+
+      columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, 
columnIndexTruncateLength);
+      offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+    });
   }
 
   /**
@@ -641,45 +677,51 @@ public class ParquetFileWriter implements AutoCloseable {
    * @throws IOException if there is an error while writing
    */
   public void writeDictionaryPage(DictionaryPage dictionaryPage) throws 
IOException {
-    writeDictionaryPage(dictionaryPage, null, null);
+    withAbortOnFailure(() -> {
+      writeDictionaryPage(dictionaryPage, null, null);
+    });
   }
 
   public void writeDictionaryPage(
       DictionaryPage dictionaryPage, BlockCipher.Encryptor 
headerBlockEncryptor, byte[] AAD) throws IOException {
-    state = state.write();
-    LOG.debug("{}: write dictionary page: {} values", out.getPos(), 
dictionaryPage.getDictionarySize());
-    currentChunkDictionaryPageOffset = out.getPos();
-    int uncompressedSize = dictionaryPage.getUncompressedSize();
-    int compressedPageSize = Math.toIntExact(dictionaryPage.getBytes().size());
-    if (pageWriteChecksumEnabled) {
-      crc.reset();
-      crcUpdate(dictionaryPage.getBytes());
-      metadataConverter.writeDictionaryPageHeader(
-          uncompressedSize,
-          compressedPageSize,
-          dictionaryPage.getDictionarySize(),
-          dictionaryPage.getEncoding(),
-          (int) crc.getValue(),
-          out,
-          headerBlockEncryptor,
-          AAD);
-    } else {
-      metadataConverter.writeDictionaryPageHeader(
-          uncompressedSize,
-          compressedPageSize,
-          dictionaryPage.getDictionarySize(),
-          dictionaryPage.getEncoding(),
-          out,
-          headerBlockEncryptor,
-          AAD);
-    }
-    long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
-    this.uncompressedLength += uncompressedSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    LOG.debug("{}: write dictionary page content {}", out.getPos(), 
compressedPageSize);
-    dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, 
dictionary page bytes are already encrypted
-    encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
-    currentEncodings.add(dictionaryPage.getEncoding());
+    withAbortOnFailure(() -> {
+      state = state.write();
+      LOG.debug("{}: write dictionary page: {} values", out.getPos(), 
dictionaryPage.getDictionarySize());
+      currentChunkDictionaryPageOffset = out.getPos();
+      int uncompressedSize = dictionaryPage.getUncompressedSize();
+      int compressedPageSize = 
Math.toIntExact(dictionaryPage.getBytes().size());
+      if (pageWriteChecksumEnabled) {
+        crc.reset();
+        crcUpdate(dictionaryPage.getBytes());
+        metadataConverter.writeDictionaryPageHeader(
+            uncompressedSize,
+            compressedPageSize,
+            dictionaryPage.getDictionarySize(),
+            dictionaryPage.getEncoding(),
+            (int) crc.getValue(),
+            out,
+            headerBlockEncryptor,
+            AAD);
+      } else {
+        metadataConverter.writeDictionaryPageHeader(
+            uncompressedSize,
+            compressedPageSize,
+            dictionaryPage.getDictionarySize(),
+            dictionaryPage.getEncoding(),
+            out,
+            headerBlockEncryptor,
+            AAD);
+      }
+      long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
+      this.uncompressedLength += uncompressedSize + headerSize;
+      this.compressedLength += compressedPageSize + headerSize;
+      LOG.debug("{}: write dictionary page content {}", out.getPos(), 
compressedPageSize);
+      dictionaryPage
+          .getBytes()
+          .writeAllTo(out); // for encrypted column, dictionary page bytes are 
already encrypted
+      encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
+      currentEncodings.add(dictionaryPage.getEncoding());
+    });
   }
 
   /**
@@ -871,22 +913,24 @@ public class ParquetFileWriter implements AutoCloseable {
       byte[] pageHeaderAAD,
       SizeStatistics sizeStatistics)
       throws IOException {
-    long beforeHeader = out.getPos();
-    innerWriteDataPage(
-        valueCount,
-        uncompressedPageSize,
-        bytes,
-        statistics,
-        rlEncoding,
-        dlEncoding,
-        valuesEncoding,
-        metadataBlockEncryptor,
-        pageHeaderAAD,
-        sizeStatistics);
-    offsetIndexBuilder.add(
-        toIntWithCheck(out.getPos() - beforeHeader, "page"),
-        rowCount,
-        sizeStatistics != null ? 
sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+    withAbortOnFailure(() -> {
+      long beforeHeader = out.getPos();
+      innerWriteDataPage(
+          valueCount,
+          uncompressedPageSize,
+          bytes,
+          statistics,
+          rlEncoding,
+          dlEncoding,
+          valuesEncoding,
+          metadataBlockEncryptor,
+          pageHeaderAAD,
+          sizeStatistics);
+      offsetIndexBuilder.add(
+          toIntWithCheck(out.getPos() - beforeHeader, "page"),
+          rowCount,
+          sizeStatistics != null ? 
sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+    });
   }
 
   private void innerWriteDataPage(
@@ -978,51 +1022,53 @@ public class ParquetFileWriter implements AutoCloseable {
       byte[] pageHeaderAAD,
       SizeStatistics sizeStatistics)
       throws IOException {
-    state = state.write();
-    long beforeHeader = out.getPos();
-    if (currentChunkFirstDataPage < 0) {
-      currentChunkFirstDataPage = beforeHeader;
-    }
-    LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
-    int compressedPageSize = toIntWithCheck(bytes.size(), "page");
-    if (pageWriteChecksumEnabled) {
-      crc.reset();
-      crcUpdate(bytes);
-      metadataConverter.writeDataPageV1Header(
-          uncompressedPageSize,
-          compressedPageSize,
-          valueCount,
-          rlEncoding,
-          dlEncoding,
-          valuesEncoding,
-          (int) crc.getValue(),
-          out,
-          metadataBlockEncryptor,
-          pageHeaderAAD);
-    } else {
-      metadataConverter.writeDataPageV1Header(
-          uncompressedPageSize,
-          compressedPageSize,
-          valueCount,
-          rlEncoding,
-          dlEncoding,
-          valuesEncoding,
-          out,
-          metadataBlockEncryptor,
-          pageHeaderAAD);
-    }
-    long headerSize = out.getPos() - beforeHeader;
-    this.uncompressedLength += uncompressedPageSize + headerSize;
-    this.compressedLength += compressedPageSize + headerSize;
-    LOG.debug("{}: write data page content {}", out.getPos(), 
compressedPageSize);
-    bytes.writeAllTo(out);
+    withAbortOnFailure(() -> {
+      state = state.write();
+      long beforeHeader = out.getPos();
+      if (currentChunkFirstDataPage < 0) {
+        currentChunkFirstDataPage = beforeHeader;
+      }
+      LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
+      int compressedPageSize = toIntWithCheck(bytes.size(), "page");
+      if (pageWriteChecksumEnabled) {
+        crc.reset();
+        crcUpdate(bytes);
+        metadataConverter.writeDataPageV1Header(
+            uncompressedPageSize,
+            compressedPageSize,
+            valueCount,
+            rlEncoding,
+            dlEncoding,
+            valuesEncoding,
+            (int) crc.getValue(),
+            out,
+            metadataBlockEncryptor,
+            pageHeaderAAD);
+      } else {
+        metadataConverter.writeDataPageV1Header(
+            uncompressedPageSize,
+            compressedPageSize,
+            valueCount,
+            rlEncoding,
+            dlEncoding,
+            valuesEncoding,
+            out,
+            metadataBlockEncryptor,
+            pageHeaderAAD);
+      }
+      long headerSize = out.getPos() - beforeHeader;
+      this.uncompressedLength += uncompressedPageSize + headerSize;
+      this.compressedLength += compressedPageSize + headerSize;
+      LOG.debug("{}: write data page content {}", out.getPos(), 
compressedPageSize);
+      bytes.writeAllTo(out);
 
-    mergeColumnStatistics(statistics, sizeStatistics);
+      mergeColumnStatistics(statistics, sizeStatistics);
 
-    encodingStatsBuilder.addDataEncoding(valuesEncoding);
-    currentEncodings.add(rlEncoding);
-    currentEncodings.add(dlEncoding);
-    currentEncodings.add(valuesEncoding);
+      encodingStatsBuilder.addDataEncoding(valuesEncoding);
+      currentEncodings.add(rlEncoding);
+      currentEncodings.add(dlEncoding);
+      currentEncodings.add(valuesEncoding);
+    });
   }
 
   /**
@@ -1297,76 +1343,79 @@ public class ParquetFileWriter implements AutoCloseable 
{
       byte[] pageHeaderAAD,
       SizeStatistics sizeStatistics)
       throws IOException {
-    state = state.write();
-    int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page 
repetition levels");
-    int dlByteLength = toIntWithCheck(definitionLevels.size(), "page 
definition levels");
+    withAbortOnFailure(() -> {
+      state = state.write();
+      int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page 
repetition levels");
+      int dlByteLength = toIntWithCheck(definitionLevels.size(), "page 
definition levels");
 
-    int compressedSize = toIntWithCheck(bytes.size() + repetitionLevels.size() 
+ definitionLevels.size(), "page");
+      int compressedSize =
+          toIntWithCheck(bytes.size() + repetitionLevels.size() + 
definitionLevels.size(), "page");
 
-    int uncompressedSize =
-        toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + 
definitionLevels.size(), "page");
+      int uncompressedSize =
+          toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + 
definitionLevels.size(), "page");
 
-    long beforeHeader = out.getPos();
-    if (currentChunkFirstDataPage < 0) {
-      currentChunkFirstDataPage = beforeHeader;
-    }
-
-    if (pageWriteChecksumEnabled) {
-      crc.reset();
-      if (repetitionLevels.size() > 0) {
-        crcUpdate(repetitionLevels);
-      }
-      if (definitionLevels.size() > 0) {
-        crcUpdate(definitionLevels);
+      long beforeHeader = out.getPos();
+      if (currentChunkFirstDataPage < 0) {
+        currentChunkFirstDataPage = beforeHeader;
       }
-      if (bytes.size() > 0) {
-        crcUpdate(bytes);
+
+      if (pageWriteChecksumEnabled) {
+        crc.reset();
+        if (repetitionLevels.size() > 0) {
+          crcUpdate(repetitionLevels);
+        }
+        if (definitionLevels.size() > 0) {
+          crcUpdate(definitionLevels);
+        }
+        if (bytes.size() > 0) {
+          crcUpdate(bytes);
+        }
+        metadataConverter.writeDataPageV2Header(
+            uncompressedSize,
+            compressedSize,
+            valueCount,
+            nullCount,
+            rowCount,
+            dataEncoding,
+            rlByteLength,
+            dlByteLength,
+            compressed,
+            (int) crc.getValue(),
+            out,
+            metadataBlockEncryptor,
+            pageHeaderAAD);
+      } else {
+        metadataConverter.writeDataPageV2Header(
+            uncompressedSize,
+            compressedSize,
+            valueCount,
+            nullCount,
+            rowCount,
+            dataEncoding,
+            rlByteLength,
+            dlByteLength,
+            compressed,
+            out,
+            metadataBlockEncryptor,
+            pageHeaderAAD);
       }
-      metadataConverter.writeDataPageV2Header(
-          uncompressedSize,
-          compressedSize,
-          valueCount,
-          nullCount,
-          rowCount,
-          dataEncoding,
-          rlByteLength,
-          dlByteLength,
-          compressed,
-          (int) crc.getValue(),
-          out,
-          metadataBlockEncryptor,
-          pageHeaderAAD);
-    } else {
-      metadataConverter.writeDataPageV2Header(
-          uncompressedSize,
-          compressedSize,
-          valueCount,
-          nullCount,
-          rowCount,
-          dataEncoding,
-          rlByteLength,
-          dlByteLength,
-          compressed,
-          out,
-          metadataBlockEncryptor,
-          pageHeaderAAD);
-    }
 
-    long headersSize = out.getPos() - beforeHeader;
-    this.uncompressedLength += uncompressedSize + headersSize;
-    this.compressedLength += compressedSize + headersSize;
+      long headersSize = out.getPos() - beforeHeader;
+      this.uncompressedLength += uncompressedSize + headersSize;
+      this.compressedLength += compressedSize + headersSize;
 
-    mergeColumnStatistics(statistics, sizeStatistics);
+      mergeColumnStatistics(statistics, sizeStatistics);
 
-    currentEncodings.add(dataEncoding);
-    encodingStatsBuilder.addDataEncoding(dataEncoding);
+      currentEncodings.add(dataEncoding);
+      encodingStatsBuilder.addDataEncoding(dataEncoding);
 
-    BytesInput.concat(repetitionLevels, definitionLevels, 
bytes).writeAllTo(out);
+      BytesInput.concat(repetitionLevels, definitionLevels, 
bytes).writeAllTo(out);
 
-    offsetIndexBuilder.add(
-        toIntWithCheck(out.getPos() - beforeHeader, "page"),
-        rowCount,
-        sizeStatistics != null ? 
sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+      offsetIndexBuilder.add(
+          toIntWithCheck(out.getPos() - beforeHeader, "page"),
+          rowCount,
+          sizeStatistics != null ? 
sizeStatistics.getUnencodedByteArrayDataBytes() : Optional.empty());
+    });
   }
 
   private void crcUpdate(BytesInput bytes) {
@@ -1457,58 +1506,61 @@ public class ParquetFileWriter implements AutoCloseable 
{
       int columnOrdinal,
       byte[] fileAAD)
       throws IOException {
-    startColumn(descriptor, valueCount, compressionCodecName);
-
-    state = state.write();
-    if (dictionaryPage != null) {
-      byte[] dictonaryPageHeaderAAD = null;
-      if (null != headerBlockEncryptor) {
-        dictonaryPageHeaderAAD = AesCipher.createModuleAAD(
-            fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, 
columnOrdinal, -1);
+    withAbortOnFailure(() -> {
+      startColumn(descriptor, valueCount, compressionCodecName);
+
+      state = state.write();
+      if (dictionaryPage != null) {
+        byte[] dictonaryPageHeaderAAD = null;
+        if (null != headerBlockEncryptor) {
+          dictonaryPageHeaderAAD = AesCipher.createModuleAAD(
+              fileAAD, ModuleType.DictionaryPageHeader, rowGroupOrdinal, 
columnOrdinal, -1);
+        }
+        writeDictionaryPage(dictionaryPage, headerBlockEncryptor, 
dictonaryPageHeaderAAD);
       }
-      writeDictionaryPage(dictionaryPage, headerBlockEncryptor, 
dictonaryPageHeaderAAD);
-    }
 
-    if (bloomFilter != null) {
-      // write bloom filter if one of data pages is not dictionary encoded
-      boolean isWriteBloomFilter = false;
-      for (Encoding encoding : dataEncodings) {
-        // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, 
`RLE_DICTIONARY` is used in parquet v2
-        if (encoding != Encoding.PLAIN_DICTIONARY && encoding != 
Encoding.RLE_DICTIONARY) {
-          isWriteBloomFilter = true;
-          break;
+      if (bloomFilter != null) {
+        // write bloom filter if one of data pages is not dictionary encoded
+        boolean isWriteBloomFilter = false;
+        for (Encoding encoding : dataEncodings) {
+          // dictionary encoding: `PLAIN_DICTIONARY` is used in parquet v1, 
`RLE_DICTIONARY` is used in
+          // parquet v2
+          if (encoding != Encoding.PLAIN_DICTIONARY && encoding != 
Encoding.RLE_DICTIONARY) {
+            isWriteBloomFilter = true;
+            break;
+          }
+        }
+        if (isWriteBloomFilter) {
+          currentBloomFilters.put(String.join(".", descriptor.getPath()), 
bloomFilter);
+        } else {
+          LOG.info(
+              "No need to write bloom filter because column {} data pages are 
all encoded as dictionary.",
+              descriptor.getPath());
         }
       }
-      if (isWriteBloomFilter) {
-        currentBloomFilters.put(String.join(".", descriptor.getPath()), 
bloomFilter);
-      } else {
-        LOG.info(
-            "No need to write bloom filter because column {} data pages are 
all encoded as dictionary.",
-            descriptor.getPath());
+      LOG.debug("{}: write data pages", out.getPos());
+      long headersSize = bytes.size() - compressedTotalPageSize;
+      this.uncompressedLength += uncompressedTotalPageSize + headersSize;
+      this.compressedLength += compressedTotalPageSize + headersSize;
+      LOG.debug("{}: write data pages content", out.getPos());
+      currentChunkFirstDataPage = out.getPos();
+      bytes.writeAllTo(out);
+      encodingStatsBuilder.addDataEncodings(dataEncodings);
+      if (rlEncodings.isEmpty()) {
+        encodingStatsBuilder.withV2Pages();
       }
-    }
-    LOG.debug("{}: write data pages", out.getPos());
-    long headersSize = bytes.size() - compressedTotalPageSize;
-    this.uncompressedLength += uncompressedTotalPageSize + headersSize;
-    this.compressedLength += compressedTotalPageSize + headersSize;
-    LOG.debug("{}: write data pages content", out.getPos());
-    currentChunkFirstDataPage = out.getPos();
-    bytes.writeAllTo(out);
-    encodingStatsBuilder.addDataEncodings(dataEncodings);
-    if (rlEncodings.isEmpty()) {
-      encodingStatsBuilder.withV2Pages();
-    }
-    currentEncodings.addAll(rlEncodings);
-    currentEncodings.addAll(dlEncodings);
-    currentEncodings.addAll(dataEncodings);
-    currentStatistics = totalStats;
-    currentSizeStatistics = totalSizeStats;
-    currentGeospatialStatistics = totalGeospatialStats;
+      currentEncodings.addAll(rlEncodings);
+      currentEncodings.addAll(dlEncodings);
+      currentEncodings.addAll(dataEncodings);
+      currentStatistics = totalStats;
+      currentSizeStatistics = totalSizeStats;
+      currentGeospatialStatistics = totalGeospatialStats;
 
-    this.columnIndexBuilder = columnIndexBuilder;
-    this.offsetIndexBuilder = offsetIndexBuilder;
+      this.columnIndexBuilder = columnIndexBuilder;
+      this.offsetIndexBuilder = offsetIndexBuilder;
 
-    endColumn();
+      endColumn();
+    });
   }
 
   /**
@@ -1530,34 +1582,36 @@ public class ParquetFileWriter implements AutoCloseable 
{
    * @throws IOException if there is an error while writing
    */
   public void endColumn() throws IOException {
-    state = state.endColumn();
-    LOG.debug("{}: end column", out.getPos());
-    if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() 
* MAX_STATS_SIZE) {
-      currentColumnIndexes.add(null);
-    } else {
-      currentColumnIndexes.add(columnIndexBuilder.build());
-    }
-    
currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
-    currentBlock.addColumn(ColumnChunkMetaData.get(
-        currentChunkPath,
-        currentChunkType,
-        currentChunkCodec,
-        encodingStatsBuilder.build(),
-        currentEncodings,
-        currentStatistics,
-        currentChunkFirstDataPage,
-        currentChunkDictionaryPageOffset,
-        currentChunkValueCount,
-        compressedLength,
-        uncompressedLength,
-        currentSizeStatistics,
-        currentGeospatialStatistics));
-    this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + 
uncompressedLength);
-    this.uncompressedLength = 0;
-    this.compressedLength = 0;
-    this.currentChunkDictionaryPageOffset = 0;
-    columnIndexBuilder = null;
-    offsetIndexBuilder = null;
+    withAbortOnFailure(() -> {
+      state = state.endColumn();
+      LOG.debug("{}: end column", out.getPos());
+      if (columnIndexBuilder.getMinMaxSize() > 
columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) {
+        currentColumnIndexes.add(null);
+      } else {
+        currentColumnIndexes.add(columnIndexBuilder.build());
+      }
+      
currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
+      currentBlock.addColumn(ColumnChunkMetaData.get(
+          currentChunkPath,
+          currentChunkType,
+          currentChunkCodec,
+          encodingStatsBuilder.build(),
+          currentEncodings,
+          currentStatistics,
+          currentChunkFirstDataPage,
+          currentChunkDictionaryPageOffset,
+          currentChunkValueCount,
+          compressedLength,
+          uncompressedLength,
+          currentSizeStatistics,
+          currentGeospatialStatistics));
+      this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + 
uncompressedLength);
+      this.uncompressedLength = 0;
+      this.compressedLength = 0;
+      this.currentChunkDictionaryPageOffset = 0;
+      columnIndexBuilder = null;
+      offsetIndexBuilder = null;
+    });
   }
 
   /**
@@ -1566,22 +1620,24 @@ public class ParquetFileWriter implements AutoCloseable 
{
    * @throws IOException if there is an error while writing
    */
   public void endBlock() throws IOException {
-    if (currentRecordCount == 0) {
-      throw new ParquetEncodingException("End block with zero record");
-    }
+    withAbortOnFailure(() -> {
+      if (currentRecordCount == 0) {
+        throw new ParquetEncodingException("End block with zero record");
+      }
 
-    state = state.endBlock();
-    LOG.debug("{}: end block", out.getPos());
-    currentBlock.setRowCount(currentRecordCount);
-    currentBlock.setOrdinal(blocks.size());
-    blocks.add(currentBlock);
-    columnIndexes.add(currentColumnIndexes);
-    offsetIndexes.add(currentOffsetIndexes);
-    bloomFilters.add(currentBloomFilters);
-    currentColumnIndexes = null;
-    currentOffsetIndexes = null;
-    currentBloomFilters = null;
-    currentBlock = null;
+      state = state.endBlock();
+      LOG.debug("{}: end block", out.getPos());
+      currentBlock.setRowCount(currentRecordCount);
+      currentBlock.setOrdinal(blocks.size());
+      blocks.add(currentBlock);
+      columnIndexes.add(currentColumnIndexes);
+      offsetIndexes.add(currentOffsetIndexes);
+      bloomFilters.add(currentBloomFilters);
+      currentColumnIndexes = null;
+      currentOffsetIndexes = null;
+      currentBloomFilters = null;
+      currentBlock = null;
+    });
   }
 
   /**
@@ -1598,9 +1654,11 @@ public class ParquetFileWriter implements AutoCloseable {
   }
 
   public void appendFile(InputFile file) throws IOException {
-    try (ParquetFileReader reader = ParquetFileReader.open(file)) {
-      reader.appendTo(this);
-    }
+    withAbortOnFailure(() -> {
+      try (ParquetFileReader reader = ParquetFileReader.open(file)) {
+        reader.appendTo(this);
+      }
+    });
   }
 
   /**
@@ -1619,9 +1677,11 @@ public class ParquetFileWriter implements AutoCloseable {
 
   public void appendRowGroups(SeekableInputStream file, List<BlockMetaData> 
rowGroups, boolean dropColumns)
       throws IOException {
-    for (BlockMetaData block : rowGroups) {
-      appendRowGroup(file, block, dropColumns);
-    }
+    withAbortOnFailure(() -> {
+      for (BlockMetaData block : rowGroups) {
+        appendRowGroup(file, block, dropColumns);
+      }
+    });
   }
 
   /**
@@ -1639,83 +1699,86 @@ public class ParquetFileWriter implements AutoCloseable 
{
 
   public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup, 
boolean dropColumns)
       throws IOException {
-    startBlock(rowGroup.getRowCount());
-
-    Map<String, ColumnChunkMetaData> columnsToCopy = new HashMap<String, 
ColumnChunkMetaData>();
-    for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
-      columnsToCopy.put(chunk.getPath().toDotString(), chunk);
-    }
+    withAbortOnFailure(() -> {
+      startBlock(rowGroup.getRowCount());
 
-    List<ColumnChunkMetaData> columnsInOrder = new 
ArrayList<ColumnChunkMetaData>();
-
-    for (ColumnDescriptor descriptor : schema.getColumns()) {
-      String path = ColumnPath.get(descriptor.getPath()).toDotString();
-      ColumnChunkMetaData chunk = columnsToCopy.remove(path);
-      if (chunk != null) {
-        columnsInOrder.add(chunk);
-      } else {
-        throw new IllegalArgumentException(
-            String.format("Missing column '%s', cannot copy row group: %s", 
path, rowGroup));
+      Map<String, ColumnChunkMetaData> columnsToCopy = new HashMap<String, 
ColumnChunkMetaData>();
+      for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
+        columnsToCopy.put(chunk.getPath().toDotString(), chunk);
       }
-    }
 
-    // complain if some columns would be dropped and that's not okay
-    if (!dropColumns && !columnsToCopy.isEmpty()) {
-      throw new IllegalArgumentException(String.format(
-          "Columns cannot be copied (missing from target schema): %s",
-          String.join(", ", columnsToCopy.keySet())));
-    }
-
-    // copy the data for all chunks
-    long start = -1;
-    long length = 0;
-    long blockUncompressedSize = 0L;
-    for (int i = 0; i < columnsInOrder.size(); i += 1) {
-      ColumnChunkMetaData chunk = columnsInOrder.get(i);
-
-      // get this chunk's start position in the new file
-      long newChunkStart = out.getPos() + length;
+      List<ColumnChunkMetaData> columnsInOrder = new 
ArrayList<ColumnChunkMetaData>();
 
-      // add this chunk to be copied with any previous chunks
-      if (start < 0) {
-        // no previous chunk included, start at this chunk's starting pos
-        start = chunk.getStartingPos();
+      for (ColumnDescriptor descriptor : schema.getColumns()) {
+        String path = ColumnPath.get(descriptor.getPath()).toDotString();
+        ColumnChunkMetaData chunk = columnsToCopy.remove(path);
+        if (chunk != null) {
+          columnsInOrder.add(chunk);
+        } else {
+          throw new IllegalArgumentException(
+              String.format("Missing column '%s', cannot copy row group: %s", 
path, rowGroup));
+        }
       }
-      length += chunk.getTotalSize();
-
-      if ((i + 1) == columnsInOrder.size() || columnsInOrder.get(i + 
1).getStartingPos() != (start + length)) {
-        // not contiguous. do the copy now.
-        copy(from, out, start, length);
-        // reset to start at the next column chunk
-        start = -1;
-        length = 0;
+
+      // complain if some columns would be dropped and that's not okay
+      if (!dropColumns && !columnsToCopy.isEmpty()) {
+        throw new IllegalArgumentException(String.format(
+            "Columns cannot be copied (missing from target schema): %s",
+            String.join(", ", columnsToCopy.keySet())));
       }
 
-      // TODO: column/offset indexes are not copied
-      // (it would require seeking to the end of the file for each row groups)
-      currentColumnIndexes.add(null);
-      currentOffsetIndexes.add(null);
+      // copy the data for all chunks
+      long start = -1;
+      long length = 0;
+      long blockUncompressedSize = 0L;
+      for (int i = 0; i < columnsInOrder.size(); i += 1) {
+        ColumnChunkMetaData chunk = columnsInOrder.get(i);
 
-      Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
-      currentBlock.addColumn(ColumnChunkMetaData.get(
-          chunk.getPath(),
-          chunk.getPrimitiveType(),
-          chunk.getCodec(),
-          chunk.getEncodingStats(),
-          chunk.getEncodings(),
-          chunk.getStatistics(),
-          offsets.firstDataPageOffset,
-          offsets.dictionaryPageOffset,
-          chunk.getValueCount(),
-          chunk.getTotalSize(),
-          chunk.getTotalUncompressedSize()));
+        // get this chunk's start position in the new file
+        long newChunkStart = out.getPos() + length;
 
-      blockUncompressedSize += chunk.getTotalUncompressedSize();
-    }
+        // add this chunk to be copied with any previous chunks
+        if (start < 0) {
+          // no previous chunk included, start at this chunk's starting pos
+          start = chunk.getStartingPos();
+        }
+        length += chunk.getTotalSize();
+
+        if ((i + 1) == columnsInOrder.size()
+            || columnsInOrder.get(i + 1).getStartingPos() != (start + length)) 
{
+          // not contiguous. do the copy now.
+          copy(from, out, start, length);
+          // reset to start at the next column chunk
+          start = -1;
+          length = 0;
+        }
 
-    currentBlock.setTotalByteSize(blockUncompressedSize);
+        // TODO: column/offset indexes are not copied
+        // (it would require seeking to the end of the file for each row 
groups)
+        currentColumnIndexes.add(null);
+        currentOffsetIndexes.add(null);
+
+        Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+        currentBlock.addColumn(ColumnChunkMetaData.get(
+            chunk.getPath(),
+            chunk.getPrimitiveType(),
+            chunk.getCodec(),
+            chunk.getEncodingStats(),
+            chunk.getEncodings(),
+            chunk.getStatistics(),
+            offsets.firstDataPageOffset,
+            offsets.dictionaryPageOffset,
+            chunk.getValueCount(),
+            chunk.getTotalSize(),
+            chunk.getTotalUncompressedSize()));
+
+        blockUncompressedSize += chunk.getTotalUncompressedSize();
+      }
 
-    endBlock();
+      currentBlock.setTotalByteSize(blockUncompressedSize);
+
+      endBlock();
+    });
   }
 
   /**
@@ -1735,36 +1798,41 @@ public class ParquetFileWriter implements AutoCloseable 
{
       ColumnIndex columnIndex,
       OffsetIndex offsetIndex)
       throws IOException {
-    long start = chunk.getStartingPos();
-    long length = chunk.getTotalSize();
-    long newChunkStart = out.getPos();
+    withAbortOnFailure(() -> {
+      long start = chunk.getStartingPos();
+      long length = chunk.getTotalSize();
+      long newChunkStart = out.getPos();
 
-    if (offsetIndex != null && newChunkStart != start) {
-      offsetIndex =
-          
OffsetIndexBuilder.getBuilder().fromOffsetIndex(offsetIndex).build(newChunkStart
 - start);
-    }
+      OffsetIndex effectiveOffsetIndex = offsetIndex;
 
-    copy(from, out, start, length);
+      if (effectiveOffsetIndex != null && newChunkStart != start) {
+        effectiveOffsetIndex = OffsetIndexBuilder.getBuilder()
+            .fromOffsetIndex(effectiveOffsetIndex)
+            .build(newChunkStart - start);
+      }
 
-    currentBloomFilters.put(String.join(".", descriptor.getPath()), 
bloomFilter);
-    currentColumnIndexes.add(columnIndex);
-    currentOffsetIndexes.add(offsetIndex);
+      copy(from, out, start, length);
 
-    Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
-    currentBlock.addColumn(ColumnChunkMetaData.get(
-        chunk.getPath(),
-        chunk.getPrimitiveType(),
-        chunk.getCodec(),
-        chunk.getEncodingStats(),
-        chunk.getEncodings(),
-        chunk.getStatistics(),
-        offsets.firstDataPageOffset,
-        offsets.dictionaryPageOffset,
-        chunk.getValueCount(),
-        chunk.getTotalSize(),
-        chunk.getTotalUncompressedSize()));
+      currentBloomFilters.put(String.join(".", descriptor.getPath()), 
bloomFilter);
+      currentColumnIndexes.add(columnIndex);
+      currentOffsetIndexes.add(effectiveOffsetIndex);
+
+      Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
+      currentBlock.addColumn(ColumnChunkMetaData.get(
+          chunk.getPath(),
+          chunk.getPrimitiveType(),
+          chunk.getCodec(),
+          chunk.getEncodingStats(),
+          chunk.getEncodings(),
+          chunk.getStatistics(),
+          offsets.firstDataPageOffset,
+          offsets.dictionaryPageOffset,
+          chunk.getValueCount(),
+          chunk.getTotalSize(),
+          chunk.getTotalUncompressedSize()));
 
-    currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + 
chunk.getTotalUncompressedSize());
+      currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + 
chunk.getTotalUncompressedSize());
+    });
   }
 
   // Buffers for the copy function.
@@ -1804,17 +1872,25 @@ public class ParquetFileWriter implements AutoCloseable 
{
    * @throws IOException if there is an error while writing
    */
   public void end(Map<String, String> extraMetaData) throws IOException {
-    try {
-      state = state.end();
-      serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
-      serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
-      serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
-      LOG.debug("{}: end", out.getPos());
-      this.footer = new ParquetMetadata(new FileMetaData(schema, 
extraMetaData, Version.FULL_VERSION), blocks);
-      serializeFooter(footer, out, fileEncryptor, metadataConverter);
-    } finally {
-      close();
-    }
+    withAbortOnFailure(() -> {
+      try {
+        state = state.end();
+        serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
+        serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
+        serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
+        LOG.debug("{}: end", out.getPos());
+        this.footer =
+            new ParquetMetadata(new FileMetaData(schema, extraMetaData, 
Version.FULL_VERSION), blocks);
+        serializeFooter(footer, out, fileEncryptor, metadataConverter);
+      } finally {
+        close();
+      }
+    });
+  }
+
+  /* Mark the writer as aborted to avoid flushing incomplete data. */
+  public void abort() {
+    aborted = true;
   }
 
   @Override
@@ -1822,8 +1898,13 @@ public class ParquetFileWriter implements AutoCloseable {
     if (closed) {
       return;
     }
-    try (PositionOutputStream temp = out) {
-      temp.flush();
+
+    try {
+      if (!aborted) {
+        try (PositionOutputStream temp = out) {
+          temp.flush();
+        }
+      }
       if (crcAllocator != null) {
         crcAllocator.close();
       }
@@ -2274,11 +2355,11 @@ public class ParquetFileWriter implements AutoCloseable 
{
    * @throws IOException if there is an error while getting the current 
stream's position
    */
   public long getPos() throws IOException {
-    return out.getPos();
+    return withAbortOnFailure(() -> out.getPos());
   }
 
   public long getNextRowGroupSize() throws IOException {
-    return alignment.nextRowGroupSize(out);
+    return withAbortOnFailure(() -> alignment.nextRowGroupSize(out));
   }
 
   /**
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
index 2cd83624f..38b66d770 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java
@@ -44,6 +44,7 @@ import static org.junit.Assert.assertTrue;
 import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -52,6 +53,7 @@ import java.util.concurrent.Callable;
 import net.openhft.hashing.LongHashFunction;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.ParquetReadOptions;
 import org.apache.parquet.bytes.HeapByteBufferAllocator;
@@ -722,4 +724,42 @@ public class TestParquetWriter {
       }
     }
   }
+
+  @Test
+  public void testNoFlushAfterException() throws Exception {
+    final File testDir = temp.newFile();
+    testDir.delete();
+
+    final Path file = new Path(testDir.getAbsolutePath(), "test.parquet");
+
+    MessageType schema = Types.buildMessage()
+        .required(BINARY)
+        .named("binary_field")
+        .required(INT32)
+        .named("int32_field")
+        .named("test_schema_abort");
+    Configuration conf = new Configuration();
+
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(new 
Path(file.toString()))
+        .withAllocator(allocator)
+        .withType(schema)
+        .build()) {
+
+      SimpleGroupFactory f = new SimpleGroupFactory(schema);
+      writer.write(f.newGroup().append("binary_field", 
"hello").append("int32_field", 123));
+
+      Field internalWriterField = 
ParquetWriter.class.getDeclaredField("writer");
+      internalWriterField.setAccessible(true);
+      Object internalWriter = internalWriterField.get(writer);
+
+      Field abortedField = 
internalWriter.getClass().getDeclaredField("aborted");
+      abortedField.setAccessible(true);
+      abortedField.setBoolean(internalWriter, true);
+      writer.close();
+    }
+
+    // After closing, check whether file exists or is empty
+    FileSystem fs = file.getFileSystem(conf);
+    assertTrue(!fs.exists(file) || fs.getFileStatus(file).getLen() == 0);
+  }
 }


Reply via email to