This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new a93207f19 PARQUET-2437: Avoid flushing at Parquet writes after an 
exception (#1285)
a93207f19 is described below

commit a93207f193cf794361e68b164255c30fd3dbfe3b
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Tue Mar 5 10:46:54 2024 +0100

    PARQUET-2437: Avoid flushing at Parquet writes after an exception (#1285)
---
 .../parquet/column/impl/ColumnWriteStoreBase.java  |  19 +-
 .../parquet/column/impl/ColumnWriterBase.java      | 183 ++++++++++++-------
 .../apache/parquet/column/impl/StatusManager.java  |  60 +++++++
 .../hadoop/InternalParquetRecordWriter.java        |  52 +++---
 .../filter2/recordlevel/PhoneBookWriter.java       |   6 +-
 .../parquet/hadoop/TestParquetWriterError.java     | 196 +++++++++++++++++++++
 6 files changed, 429 insertions(+), 87 deletions(-)

diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index d127c1ac6..9bc772649 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -57,6 +57,7 @@ abstract class ColumnWriteStoreBase implements 
ColumnWriteStore {
   private final long thresholdTolerance;
   private long rowCount;
   private long rowCountForNextSizeCheck;
+  private StatusManager statusManager = StatusManager.create();
 
   // To be used by the deprecated constructor of ColumnWriteStoreV1
   @Deprecated
@@ -73,7 +74,7 @@ abstract class ColumnWriteStoreBase implements 
ColumnWriteStore {
       public ColumnWriter getColumnWriter(ColumnDescriptor path) {
         ColumnWriterBase column = columns.get(path);
         if (column == null) {
-          column = createColumnWriter(path, 
pageWriteStore.getPageWriter(path), null, props);
+          column = createColumnWriterBase(path, 
pageWriteStore.getPageWriter(path), null, props);
           columns.put(path, column);
         }
         return column;
@@ -87,7 +88,7 @@ abstract class ColumnWriteStoreBase implements 
ColumnWriteStore {
     Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
     for (ColumnDescriptor path : schema.getColumns()) {
       PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-      mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
+      mcolumns.put(path, createColumnWriterBase(path, pageWriter, null, 
props));
     }
     this.columns = unmodifiableMap(mcolumns);
 
@@ -114,9 +115,9 @@ abstract class ColumnWriteStoreBase implements 
ColumnWriteStore {
       PageWriter pageWriter = pageWriteStore.getPageWriter(path);
       if (props.isBloomFilterEnabled(path)) {
         BloomFilterWriter bloomFilterWriter = 
bloomFilterWriteStore.getBloomFilterWriter(path);
-        mcolumns.put(path, createColumnWriter(path, pageWriter, 
bloomFilterWriter, props));
+        mcolumns.put(path, createColumnWriterBase(path, pageWriter, 
bloomFilterWriter, props));
       } else {
-        mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
+        mcolumns.put(path, createColumnWriterBase(path, pageWriter, null, 
props));
       }
     }
     this.columns = unmodifiableMap(mcolumns);
@@ -131,6 +132,16 @@ abstract class ColumnWriteStoreBase implements 
ColumnWriteStore {
     };
   }
 
+  private ColumnWriterBase createColumnWriterBase(
+      ColumnDescriptor path,
+      PageWriter pageWriter,
+      BloomFilterWriter bloomFilterWriter,
+      ParquetProperties props) {
+    ColumnWriterBase columnWriterBase = createColumnWriter(path, pageWriter, 
bloomFilterWriter, props);
+    columnWriterBase.initStatusManager(statusManager);
+    return columnWriterBase;
+  }
+
   abstract ColumnWriterBase createColumnWriter(
       ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter 
bloomFilterWriter, ParquetProperties props);
 
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index e0d0e1a19..1b0f3ba4d 100644
--- 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.impl;
 
 import java.io.IOException;
+import java.util.Objects;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriter;
 import org.apache.parquet.column.ParquetProperties;
@@ -52,6 +53,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
 
   private long rowsWrittenSoFar = 0;
   private int pageRowCount;
+  private StatusManager statusManager = StatusManager.create();
 
   private final ColumnValueCollector collector;
 
@@ -74,6 +76,10 @@ abstract class ColumnWriterBase implements ColumnWriter {
     this.collector = new ColumnValueCollector(path, bloomFilterWriter, props);
   }
 
+  void initStatusManager(StatusManager statusManager) {
+    this.statusManager = Objects.requireNonNull(statusManager);
+  }
+
   abstract ValuesWriter createRLWriter(ParquetProperties props, 
ColumnDescriptor path);
 
   abstract ValuesWriter createDLWriter(ParquetProperties props, 
ColumnDescriptor path);
@@ -103,10 +109,15 @@ abstract class ColumnWriterBase implements ColumnWriter {
   @Override
   public void writeNull(int repetitionLevel, int definitionLevel) {
     if (DEBUG) log(null, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    collector.writeNull(repetitionLevel, definitionLevel);
-    ++valueCount;
+    try {
+      repetitionLevel(repetitionLevel);
+      definitionLevel(definitionLevel);
+      collector.writeNull(repetitionLevel, definitionLevel);
+      ++valueCount;
+    } catch (Throwable e) {
+      statusManager.abort();
+      throw e;
+    }
   }
 
   @Override
@@ -135,11 +146,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
   @Override
   public void write(double value, int repetitionLevel, int definitionLevel) {
     if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeDouble(value);
-    collector.write(value, repetitionLevel, definitionLevel);
-    ++valueCount;
+    try {
+      repetitionLevel(repetitionLevel);
+      definitionLevel(definitionLevel);
+      dataColumn.writeDouble(value);
+      collector.write(value, repetitionLevel, definitionLevel);
+      ++valueCount;
+    } catch (Throwable e) {
+      statusManager.abort();
+      throw e;
+    }
   }
 
   /**
@@ -152,11 +168,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
   @Override
   public void write(float value, int repetitionLevel, int definitionLevel) {
     if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeFloat(value);
-    collector.write(value, repetitionLevel, definitionLevel);
-    ++valueCount;
+    try {
+      repetitionLevel(repetitionLevel);
+      definitionLevel(definitionLevel);
+      dataColumn.writeFloat(value);
+      collector.write(value, repetitionLevel, definitionLevel);
+      ++valueCount;
+    } catch (Throwable e) {
+      statusManager.abort();
+      throw e;
+    }
   }
 
   /**
@@ -169,11 +190,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
   @Override
   public void write(Binary value, int repetitionLevel, int definitionLevel) {
     if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeBytes(value);
-    collector.write(value, repetitionLevel, definitionLevel);
-    ++valueCount;
+    try {
+      repetitionLevel(repetitionLevel);
+      definitionLevel(definitionLevel);
+      dataColumn.writeBytes(value);
+      collector.write(value, repetitionLevel, definitionLevel);
+      ++valueCount;
+    } catch (Throwable e) {
+      statusManager.abort();
+      throw e;
+    }
   }
 
   /**
@@ -186,11 +212,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
   @Override
   public void write(boolean value, int repetitionLevel, int definitionLevel) {
     if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeBoolean(value);
-    collector.write(value, repetitionLevel, definitionLevel);
-    ++valueCount;
+    try {
+      repetitionLevel(repetitionLevel);
+      definitionLevel(definitionLevel);
+      dataColumn.writeBoolean(value);
+      collector.write(value, repetitionLevel, definitionLevel);
+      ++valueCount;
+    } catch (Throwable e) {
+      statusManager.abort();
+      throw e;
+    }
   }
 
   /**
@@ -203,11 +234,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
   @Override
   public void write(int value, int repetitionLevel, int definitionLevel) {
     if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeInteger(value);
-    collector.write(value, repetitionLevel, definitionLevel);
-    ++valueCount;
+    try {
+      repetitionLevel(repetitionLevel);
+      definitionLevel(definitionLevel);
+      dataColumn.writeInteger(value);
+      collector.write(value, repetitionLevel, definitionLevel);
+      ++valueCount;
+    } catch (Throwable e) {
+      statusManager.abort();
+      throw e;
+    }
   }
 
   /**
@@ -220,11 +256,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
   @Override
   public void write(long value, int repetitionLevel, int definitionLevel) {
     if (DEBUG) log(value, repetitionLevel, definitionLevel);
-    repetitionLevel(repetitionLevel);
-    definitionLevel(definitionLevel);
-    dataColumn.writeLong(value);
-    collector.write(value, repetitionLevel, definitionLevel);
-    ++valueCount;
+    try {
+      repetitionLevel(repetitionLevel);
+      definitionLevel(definitionLevel);
+      dataColumn.writeLong(value);
+      collector.write(value, repetitionLevel, definitionLevel);
+      ++valueCount;
+    } catch (Throwable e) {
+      statusManager.abort();
+      throw e;
+    }
   }
 
   /**
@@ -232,18 +273,27 @@ abstract class ColumnWriterBase implements ColumnWriter {
    * Is called right after writePage
    */
   void finalizeColumnChunk() {
-    final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
-    if (dictionaryPage != null) {
-      if (DEBUG) LOG.debug("write dictionary");
-      try {
-        pageWriter.writeDictionaryPage(dictionaryPage);
-      } catch (IOException e) {
-        throw new ParquetEncodingException("could not write dictionary page 
for " + path, e);
-      }
-      dataColumn.resetDictionary();
+    if (statusManager.isAborted()) {
+      // We are aborting -> nothing to be done
+      return;
     }
+    try {
+      final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
+      if (dictionaryPage != null) {
+        if (DEBUG) LOG.debug("write dictionary");
+        try {
+          pageWriter.writeDictionaryPage(dictionaryPage);
+        } catch (IOException e) {
+          throw new ParquetEncodingException("could not write dictionary page 
for " + path, e);
+        }
+        dataColumn.resetDictionary();
+      }
 
-    collector.finalizeColumnChunk();
+      collector.finalizeColumnChunk();
+    } catch (Throwable t) {
+      statusManager.abort();
+      throw t;
+    }
   }
 
   /**
@@ -317,26 +367,35 @@ abstract class ColumnWriterBase implements ColumnWriter {
     if (valueCount == 0) {
       throw new ParquetEncodingException("writing empty page");
     }
-    this.rowsWrittenSoFar += pageRowCount;
-    if (DEBUG) LOG.debug("write page");
+    if (statusManager.isAborted()) {
+      // We are aborting -> nothing to be done
+      return;
+    }
     try {
-      writePage(
-          pageRowCount,
-          valueCount,
-          collector.getStatistics(),
-          collector.getSizeStatistics(),
-          repetitionLevelColumn,
-          definitionLevelColumn,
-          dataColumn);
-    } catch (IOException e) {
-      throw new ParquetEncodingException("could not write page for " + path, 
e);
+      this.rowsWrittenSoFar += pageRowCount;
+      if (DEBUG) LOG.debug("write page");
+      try {
+        writePage(
+            pageRowCount,
+            valueCount,
+            collector.getStatistics(),
+            collector.getSizeStatistics(),
+            repetitionLevelColumn,
+            definitionLevelColumn,
+            dataColumn);
+      } catch (IOException e) {
+        throw new ParquetEncodingException("could not write page for " + path, 
e);
+      }
+      repetitionLevelColumn.reset();
+      definitionLevelColumn.reset();
+      dataColumn.reset();
+      valueCount = 0;
+      collector.resetPageStatistics();
+      pageRowCount = 0;
+    } catch (Throwable t) {
+      statusManager.abort();
+      throw t;
     }
-    repetitionLevelColumn.reset();
-    definitionLevelColumn.reset();
-    dataColumn.reset();
-    valueCount = 0;
-    collector.resetPageStatistics();
-    pageRowCount = 0;
   }
 
   abstract void writePage(
diff --git 
a/parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java
 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java
new file mode 100644
index 000000000..a1ae1750a
--- /dev/null
+++ 
b/parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+/**
+ * Interface to manage the current error status. It is used to share the 
status of all the different (column, page,
+ * etc.) writer/reader instances.
+ */
+interface StatusManager {
+
+  /**
+   * Creates an instance of the default {@link StatusManager} implementation.
+   *
+   * @return the newly created {@link StatusManager} instance
+   */
+  static StatusManager create() {
+    return new StatusManager() {
+      private boolean aborted;
+
+      @Override
+      public void abort() {
+        aborted = true;
+      }
+
+      @Override
+      public boolean isAborted() {
+        return aborted;
+      }
+    };
+  }
+
+  /**
+   * To be invoked if the current process is to be aborted. For example in 
case of an exception is occurred during
+   * writing a page.
+   */
+  void abort();
+
+  /**
+   * Returns whether the current process is aborted.
+   *
+   * @return {@code true} if the current process is aborted, {@code false} 
otherwise
+   */
+  boolean isAborted();
+}
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 20809089a..0cc05d6d7 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -68,6 +68,7 @@ class InternalParquetRecordWriter<T> {
 
   private InternalFileEncryptor fileEncryptor;
   private int rowGroupOrdinal;
+  private boolean aborted;
 
   /**
    * @param parquetFileWriter the file to write to
@@ -127,6 +128,9 @@ class InternalParquetRecordWriter<T> {
   public void close() throws IOException, InterruptedException {
     if (!closed) {
       try {
+        if (aborted) {
+          return;
+        }
         flushRowGroupToStore();
         FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
         Map<String, String> finalMetadata = new HashMap<String, 
String>(extraMetaData);
@@ -144,9 +148,14 @@ class InternalParquetRecordWriter<T> {
   }
 
   public void write(T value) throws IOException, InterruptedException {
-    writeSupport.write(value);
-    ++recordCount;
-    checkBlockSizeReached();
+    try {
+      writeSupport.write(value);
+      ++recordCount;
+      checkBlockSizeReached();
+    } catch (Throwable t) {
+      aborted = true;
+      throw t;
+    }
   }
 
   /**
@@ -187,25 +196,28 @@ class InternalParquetRecordWriter<T> {
   }
 
   private void flushRowGroupToStore() throws IOException {
-    recordConsumer.flush();
-    LOG.debug("Flushing mem columnStore to file. allocated memory: {}", 
columnStore.getAllocatedSize());
-    if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
-      LOG.warn("Too much memory used: {}", columnStore.memUsageString());
-    }
+    try {
+      recordConsumer.flush();
+      LOG.debug("Flushing mem columnStore to file. allocated memory: {}", 
columnStore.getAllocatedSize());
+      if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
+        LOG.warn("Too much memory used: {}", columnStore.memUsageString());
+      }
 
-    if (recordCount > 0) {
-      rowGroupOrdinal++;
-      parquetFileWriter.startBlock(recordCount);
-      columnStore.flush();
-      pageStore.flushToFileWriter(parquetFileWriter);
-      recordCount = 0;
-      parquetFileWriter.endBlock();
-      this.nextRowGroupSize = 
Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold);
+      if (recordCount > 0) {
+        rowGroupOrdinal++;
+        parquetFileWriter.startBlock(recordCount);
+        columnStore.flush();
+        pageStore.flushToFileWriter(parquetFileWriter);
+        recordCount = 0;
+        parquetFileWriter.endBlock();
+        this.nextRowGroupSize = 
Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold);
+      }
+    } finally {
+      AutoCloseables.uncheckedClose(columnStore, pageStore, 
bloomFilterWriteStore);
+      columnStore = null;
+      pageStore = null;
+      bloomFilterWriteStore = null;
     }
-    AutoCloseables.uncheckedClose(columnStore, pageStore, 
bloomFilterWriteStore);
-    columnStore = null;
-    pageStore = null;
-    bloomFilterWriteStore = null;
   }
 
   long getRowGroupSizeThreshold() {
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index 97d836aec..d4a77879e 100644
--- 
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -56,7 +56,11 @@ public class PhoneBookWriter {
       + "  }\n"
       + "}\n";
 
-  private static final MessageType schema = 
MessageTypeParser.parseMessageType(schemaString);
+  private static final MessageType schema = getSchema();
+
+  public static MessageType getSchema() {
+    return MessageTypeParser.parseMessageType(schemaString);
+  }
 
   public static class Location {
     private final Double lon;
diff --git 
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
new file mode 100644
index 000000000..51fa90e1c
--- /dev/null
+++ 
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.TrackingByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.hadoop.codec.CleanUtil;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.LocalOutputFile;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Unit test to check how Parquet writing behaves in case of an error happens 
during the writes. We use an OOM because
+ * that is the most tricky to handle. In this case we shall avoid flushing 
since it may cause writing to already
+ * released memory spaces.
+ * <p>
+ * To catch the potential issue of writing into released ByteBuffer objects, 
direct memory allocation is used and at the
+ * release() call we actually release the related direct memory and zero the 
address inside the ByteBuffer object. As a
+ * result, a subsequent read/write call on the related ByteBuffer object will 
crash the whole jvm. (Unfortunately, there
+ * is no better way to test this.) To avoid crashing the test executor jvm, 
the code of this test is executed in a
+ * separate process.
+ */
+public class TestParquetWriterError {
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Test
+  public void testInSeparateProcess() throws IOException, InterruptedException 
{
+    String outputFile = tmpFolder.newFile("out.parquet").toString();
+
+    String classpath = System.getProperty("java.class.path");
+    String javaPath = Paths.get(System.getProperty("java.home"), "bin", "java")
+        .toAbsolutePath()
+        .toString();
+    Process process = new ProcessBuilder()
+        .command(javaPath, "-cp", classpath, Main.class.getName(), outputFile)
+        .redirectError(ProcessBuilder.Redirect.INHERIT)
+        .redirectOutput(ProcessBuilder.Redirect.INHERIT)
+        .start();
+    Assert.assertEquals(
+        "Test process exited with a non-zero return code. See previous logs 
for details.",
+        0,
+        process.waitFor());
+  }
+
+  /**
+   * The class to be used to execute this test in a separate thread.
+   */
+  public static class Main {
+
+    private static final Random RANDOM = new Random(2024_02_27_14_20L);
+
+    // See the release() implementation in createAllocator()
+    private static final Field BUFFER_ADDRESS;
+
+    static {
+      Field bufferAddress;
+      try {
+        Class<?> bufferClass = Class.forName("java.nio.Buffer");
+        bufferAddress = bufferClass.getDeclaredField("address");
+        bufferAddress.setAccessible(true);
+      } catch (Exception e) {
+        // From java 17 it does not work, but we still test on earlier ones, 
so we are fine
+        bufferAddress = null;
+      }
+      BUFFER_ADDRESS = bufferAddress;
+    }
+
+    private static Group generateNext() {
+      PhoneBookWriter.Location location;
+      double chance = RANDOM.nextDouble();
+      if (chance < .45) {
+        location = new PhoneBookWriter.Location(RANDOM.nextDouble(), 
RANDOM.nextDouble());
+      } else if (chance < .9) {
+        location = new PhoneBookWriter.Location(RANDOM.nextDouble(), null);
+      } else {
+        location = null;
+      }
+      List<PhoneBookWriter.PhoneNumber> phoneNumbers;
+      if (RANDOM.nextDouble() < .1) {
+        phoneNumbers = null;
+      } else {
+        int n = RANDOM.nextInt(4);
+        phoneNumbers = new ArrayList<>(n);
+        for (int i = 0; i < n; ++i) {
+          String kind = RANDOM.nextDouble() < .1 ? null : "kind" + 
RANDOM.nextInt(5);
+          phoneNumbers.add(new PhoneBookWriter.PhoneNumber(RANDOM.nextInt(), 
kind));
+        }
+      }
+      String name = RANDOM.nextDouble() < .1 ? null : "name" + 
RANDOM.nextLong();
+      PhoneBookWriter.User user = new PhoneBookWriter.User(RANDOM.nextLong(), 
name, phoneNumbers, location);
+      return PhoneBookWriter.groupFromUser(user);
+    }
+
+    private static TrackingByteBufferAllocator createAllocator(final int 
oomAt) {
+      return TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator() {
+        private int counter = 0;
+
+        @Override
+        public ByteBuffer allocate(int size) {
+          if (++counter >= oomAt) {
+            Assert.assertEquals(
+                "There should not be any additional allocations after an OOM", 
oomAt, counter);
+            throw new OutOfMemoryError("Artificial OOM to fail write");
+          }
+          return super.allocate(size);
+        }
+
+        @Override
+        public void release(ByteBuffer b) {
+          CleanUtil.cleanDirectBuffer(b);
+
+          // It seems, if the size of the buffers are small, the related 
memory space is not given back to the
+          // OS, so writing to them after release does not cause any 
identifiable issue. Therefore, we
+          // try to explicitly zero the address, so the jvm crashes for a 
subsequent access.
+          try {
+            if (BUFFER_ADDRESS != null) {
+              BUFFER_ADDRESS.setLong(b, 0L);
+            }
+          } catch (IllegalAccessException e) {
+            throw new RuntimeException("Unable to zero direct ByteBuffer 
address", e);
+          }
+        }
+      });
+    }
+
+    public static void main(String[] args) throws Throwable {
+      // Codecs supported by the direct codec factory by default (without 
specific hadoop native libs)
+      CompressionCodecName[] codecs = {
+        CompressionCodecName.UNCOMPRESSED,
+        CompressionCodecName.GZIP,
+        CompressionCodecName.SNAPPY,
+        CompressionCodecName.ZSTD,
+        CompressionCodecName.LZ4_RAW
+      };
+      for (int cycle = 0; cycle < 50; ++cycle) {
+        try (TrackingByteBufferAllocator allocator = 
createAllocator(RANDOM.nextInt(100) + 1);
+            ParquetWriter<Group> writer = ExampleParquetWriter.builder(
+                    new LocalOutputFile(Paths.get(args[0])))
+                .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                .withType(PhoneBookWriter.getSchema())
+                .withAllocator(allocator)
+                .withCodecFactory(CodecFactory.createDirectCodecFactory(
+                    new Configuration(), allocator, 
ParquetProperties.DEFAULT_PAGE_SIZE))
+                // Also validating the different direct codecs which might 
also have issues if an OOM
+                // happens
+                .withCompressionCodec(codecs[RANDOM.nextInt(codecs.length)])
+                .build()) {
+          for (int i = 0; i < 100_000; ++i) {
+            writer.write(generateNext());
+          }
+          Assert.fail("An OOM should have been thrown");
+        } catch (OutOfMemoryError oom) {
+          Throwable[] suppressed = oom.getSuppressed();
+          // No exception should be suppressed after the expected OOM:
+          // It would mean that a close() call fails with an exception
+          if (suppressed != null && suppressed.length > 0) {
+            throw suppressed[0];
+          }
+        }
+      }
+    }
+  }
+}

Reply via email to