This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new a93207f19 PARQUET-2437: Avoid flushing at Parquet writes after an
exception (#1285)
a93207f19 is described below
commit a93207f193cf794361e68b164255c30fd3dbfe3b
Author: Gabor Szadovszky <[email protected]>
AuthorDate: Tue Mar 5 10:46:54 2024 +0100
PARQUET-2437: Avoid flushing at Parquet writes after an exception (#1285)
---
.../parquet/column/impl/ColumnWriteStoreBase.java | 19 +-
.../parquet/column/impl/ColumnWriterBase.java | 183 ++++++++++++-------
.../apache/parquet/column/impl/StatusManager.java | 60 +++++++
.../hadoop/InternalParquetRecordWriter.java | 52 +++---
.../filter2/recordlevel/PhoneBookWriter.java | 6 +-
.../parquet/hadoop/TestParquetWriterError.java | 196 +++++++++++++++++++++
6 files changed, 429 insertions(+), 87 deletions(-)
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
index d127c1ac6..9bc772649 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java
@@ -57,6 +57,7 @@ abstract class ColumnWriteStoreBase implements
ColumnWriteStore {
private final long thresholdTolerance;
private long rowCount;
private long rowCountForNextSizeCheck;
+ private StatusManager statusManager = StatusManager.create();
// To be used by the deprecated constructor of ColumnWriteStoreV1
@Deprecated
@@ -73,7 +74,7 @@ abstract class ColumnWriteStoreBase implements
ColumnWriteStore {
public ColumnWriter getColumnWriter(ColumnDescriptor path) {
ColumnWriterBase column = columns.get(path);
if (column == null) {
- column = createColumnWriter(path,
pageWriteStore.getPageWriter(path), null, props);
+ column = createColumnWriterBase(path,
pageWriteStore.getPageWriter(path), null, props);
columns.put(path, column);
}
return column;
@@ -87,7 +88,7 @@ abstract class ColumnWriteStoreBase implements
ColumnWriteStore {
Map<ColumnDescriptor, ColumnWriterBase> mcolumns = new TreeMap<>();
for (ColumnDescriptor path : schema.getColumns()) {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
- mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
+ mcolumns.put(path, createColumnWriterBase(path, pageWriter, null,
props));
}
this.columns = unmodifiableMap(mcolumns);
@@ -114,9 +115,9 @@ abstract class ColumnWriteStoreBase implements
ColumnWriteStore {
PageWriter pageWriter = pageWriteStore.getPageWriter(path);
if (props.isBloomFilterEnabled(path)) {
BloomFilterWriter bloomFilterWriter =
bloomFilterWriteStore.getBloomFilterWriter(path);
- mcolumns.put(path, createColumnWriter(path, pageWriter,
bloomFilterWriter, props));
+ mcolumns.put(path, createColumnWriterBase(path, pageWriter,
bloomFilterWriter, props));
} else {
- mcolumns.put(path, createColumnWriter(path, pageWriter, null, props));
+ mcolumns.put(path, createColumnWriterBase(path, pageWriter, null,
props));
}
}
this.columns = unmodifiableMap(mcolumns);
@@ -131,6 +132,16 @@ abstract class ColumnWriteStoreBase implements
ColumnWriteStore {
};
}
+ private ColumnWriterBase createColumnWriterBase(
+ ColumnDescriptor path,
+ PageWriter pageWriter,
+ BloomFilterWriter bloomFilterWriter,
+ ParquetProperties props) {
+ ColumnWriterBase columnWriterBase = createColumnWriter(path, pageWriter,
bloomFilterWriter, props);
+ columnWriterBase.initStatusManager(statusManager);
+ return columnWriterBase;
+ }
+
abstract ColumnWriterBase createColumnWriter(
ColumnDescriptor path, PageWriter pageWriter, BloomFilterWriter
bloomFilterWriter, ParquetProperties props);
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
index e0d0e1a19..1b0f3ba4d 100644
---
a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
+++
b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java
@@ -19,6 +19,7 @@
package org.apache.parquet.column.impl;
import java.io.IOException;
+import java.util.Objects;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
@@ -52,6 +53,7 @@ abstract class ColumnWriterBase implements ColumnWriter {
private long rowsWrittenSoFar = 0;
private int pageRowCount;
+ private StatusManager statusManager = StatusManager.create();
private final ColumnValueCollector collector;
@@ -74,6 +76,10 @@ abstract class ColumnWriterBase implements ColumnWriter {
this.collector = new ColumnValueCollector(path, bloomFilterWriter, props);
}
+ void initStatusManager(StatusManager statusManager) {
+ this.statusManager = Objects.requireNonNull(statusManager);
+ }
+
abstract ValuesWriter createRLWriter(ParquetProperties props,
ColumnDescriptor path);
abstract ValuesWriter createDLWriter(ParquetProperties props,
ColumnDescriptor path);
@@ -103,10 +109,15 @@ abstract class ColumnWriterBase implements ColumnWriter {
@Override
public void writeNull(int repetitionLevel, int definitionLevel) {
if (DEBUG) log(null, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- collector.writeNull(repetitionLevel, definitionLevel);
- ++valueCount;
+ try {
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ collector.writeNull(repetitionLevel, definitionLevel);
+ ++valueCount;
+ } catch (Throwable e) {
+ statusManager.abort();
+ throw e;
+ }
}
@Override
@@ -135,11 +146,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
@Override
public void write(double value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeDouble(value);
- collector.write(value, repetitionLevel, definitionLevel);
- ++valueCount;
+ try {
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeDouble(value);
+ collector.write(value, repetitionLevel, definitionLevel);
+ ++valueCount;
+ } catch (Throwable e) {
+ statusManager.abort();
+ throw e;
+ }
}
/**
@@ -152,11 +168,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
@Override
public void write(float value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeFloat(value);
- collector.write(value, repetitionLevel, definitionLevel);
- ++valueCount;
+ try {
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeFloat(value);
+ collector.write(value, repetitionLevel, definitionLevel);
+ ++valueCount;
+ } catch (Throwable e) {
+ statusManager.abort();
+ throw e;
+ }
}
/**
@@ -169,11 +190,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
@Override
public void write(Binary value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeBytes(value);
- collector.write(value, repetitionLevel, definitionLevel);
- ++valueCount;
+ try {
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeBytes(value);
+ collector.write(value, repetitionLevel, definitionLevel);
+ ++valueCount;
+ } catch (Throwable e) {
+ statusManager.abort();
+ throw e;
+ }
}
/**
@@ -186,11 +212,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
@Override
public void write(boolean value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeBoolean(value);
- collector.write(value, repetitionLevel, definitionLevel);
- ++valueCount;
+ try {
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeBoolean(value);
+ collector.write(value, repetitionLevel, definitionLevel);
+ ++valueCount;
+ } catch (Throwable e) {
+ statusManager.abort();
+ throw e;
+ }
}
/**
@@ -203,11 +234,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
@Override
public void write(int value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeInteger(value);
- collector.write(value, repetitionLevel, definitionLevel);
- ++valueCount;
+ try {
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeInteger(value);
+ collector.write(value, repetitionLevel, definitionLevel);
+ ++valueCount;
+ } catch (Throwable e) {
+ statusManager.abort();
+ throw e;
+ }
}
/**
@@ -220,11 +256,16 @@ abstract class ColumnWriterBase implements ColumnWriter {
@Override
public void write(long value, int repetitionLevel, int definitionLevel) {
if (DEBUG) log(value, repetitionLevel, definitionLevel);
- repetitionLevel(repetitionLevel);
- definitionLevel(definitionLevel);
- dataColumn.writeLong(value);
- collector.write(value, repetitionLevel, definitionLevel);
- ++valueCount;
+ try {
+ repetitionLevel(repetitionLevel);
+ definitionLevel(definitionLevel);
+ dataColumn.writeLong(value);
+ collector.write(value, repetitionLevel, definitionLevel);
+ ++valueCount;
+ } catch (Throwable e) {
+ statusManager.abort();
+ throw e;
+ }
}
/**
@@ -232,18 +273,27 @@ abstract class ColumnWriterBase implements ColumnWriter {
* Is called right after writePage
*/
void finalizeColumnChunk() {
- final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
- if (dictionaryPage != null) {
- if (DEBUG) LOG.debug("write dictionary");
- try {
- pageWriter.writeDictionaryPage(dictionaryPage);
- } catch (IOException e) {
- throw new ParquetEncodingException("could not write dictionary page
for " + path, e);
- }
- dataColumn.resetDictionary();
+ if (statusManager.isAborted()) {
+ // We are aborting -> nothing to be done
+ return;
}
+ try {
+ final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
+ if (dictionaryPage != null) {
+ if (DEBUG) LOG.debug("write dictionary");
+ try {
+ pageWriter.writeDictionaryPage(dictionaryPage);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write dictionary page
for " + path, e);
+ }
+ dataColumn.resetDictionary();
+ }
- collector.finalizeColumnChunk();
+ collector.finalizeColumnChunk();
+ } catch (Throwable t) {
+ statusManager.abort();
+ throw t;
+ }
}
/**
@@ -317,26 +367,35 @@ abstract class ColumnWriterBase implements ColumnWriter {
if (valueCount == 0) {
throw new ParquetEncodingException("writing empty page");
}
- this.rowsWrittenSoFar += pageRowCount;
- if (DEBUG) LOG.debug("write page");
+ if (statusManager.isAborted()) {
+ // We are aborting -> nothing to be done
+ return;
+ }
try {
- writePage(
- pageRowCount,
- valueCount,
- collector.getStatistics(),
- collector.getSizeStatistics(),
- repetitionLevelColumn,
- definitionLevelColumn,
- dataColumn);
- } catch (IOException e) {
- throw new ParquetEncodingException("could not write page for " + path,
e);
+ this.rowsWrittenSoFar += pageRowCount;
+ if (DEBUG) LOG.debug("write page");
+ try {
+ writePage(
+ pageRowCount,
+ valueCount,
+ collector.getStatistics(),
+ collector.getSizeStatistics(),
+ repetitionLevelColumn,
+ definitionLevelColumn,
+ dataColumn);
+ } catch (IOException e) {
+ throw new ParquetEncodingException("could not write page for " + path,
e);
+ }
+ repetitionLevelColumn.reset();
+ definitionLevelColumn.reset();
+ dataColumn.reset();
+ valueCount = 0;
+ collector.resetPageStatistics();
+ pageRowCount = 0;
+ } catch (Throwable t) {
+ statusManager.abort();
+ throw t;
}
- repetitionLevelColumn.reset();
- definitionLevelColumn.reset();
- dataColumn.reset();
- valueCount = 0;
- collector.resetPageStatistics();
- pageRowCount = 0;
}
abstract void writePage(
diff --git
a/parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java
b/parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java
new file mode 100644
index 000000000..a1ae1750a
--- /dev/null
+++
b/parquet-column/src/main/java/org/apache/parquet/column/impl/StatusManager.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.column.impl;
+
+/**
+ * Interface to manage the current error status. It is used to share the
status of all the different (column, page,
+ * etc.) writer/reader instances.
+ */
+interface StatusManager {
+
+ /**
+ * Creates an instance of the default {@link StatusManager} implementation.
+ *
+ * @return the newly created {@link StatusManager} instance
+ */
+ static StatusManager create() {
+ return new StatusManager() {
+ private boolean aborted;
+
+ @Override
+ public void abort() {
+ aborted = true;
+ }
+
+ @Override
+ public boolean isAborted() {
+ return aborted;
+ }
+ };
+ }
+
+ /**
+ * To be invoked if the current process is to be aborted. For example in
case of an exception is occurred during
+ * writing a page.
+ */
+ void abort();
+
+ /**
+ * Returns whether the current process is aborted.
+ *
+ * @return {@code true} if the current process is aborted, {@code false}
otherwise
+ */
+ boolean isAborted();
+}
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 20809089a..0cc05d6d7 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -68,6 +68,7 @@ class InternalParquetRecordWriter<T> {
private InternalFileEncryptor fileEncryptor;
private int rowGroupOrdinal;
+ private boolean aborted;
/**
* @param parquetFileWriter the file to write to
@@ -127,6 +128,9 @@ class InternalParquetRecordWriter<T> {
public void close() throws IOException, InterruptedException {
if (!closed) {
try {
+ if (aborted) {
+ return;
+ }
flushRowGroupToStore();
FinalizedWriteContext finalWriteContext = writeSupport.finalizeWrite();
Map<String, String> finalMetadata = new HashMap<String,
String>(extraMetaData);
@@ -144,9 +148,14 @@ class InternalParquetRecordWriter<T> {
}
public void write(T value) throws IOException, InterruptedException {
- writeSupport.write(value);
- ++recordCount;
- checkBlockSizeReached();
+ try {
+ writeSupport.write(value);
+ ++recordCount;
+ checkBlockSizeReached();
+ } catch (Throwable t) {
+ aborted = true;
+ throw t;
+ }
}
/**
@@ -187,25 +196,28 @@ class InternalParquetRecordWriter<T> {
}
private void flushRowGroupToStore() throws IOException {
- recordConsumer.flush();
- LOG.debug("Flushing mem columnStore to file. allocated memory: {}",
columnStore.getAllocatedSize());
- if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
- LOG.warn("Too much memory used: {}", columnStore.memUsageString());
- }
+ try {
+ recordConsumer.flush();
+ LOG.debug("Flushing mem columnStore to file. allocated memory: {}",
columnStore.getAllocatedSize());
+ if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
+ LOG.warn("Too much memory used: {}", columnStore.memUsageString());
+ }
- if (recordCount > 0) {
- rowGroupOrdinal++;
- parquetFileWriter.startBlock(recordCount);
- columnStore.flush();
- pageStore.flushToFileWriter(parquetFileWriter);
- recordCount = 0;
- parquetFileWriter.endBlock();
- this.nextRowGroupSize =
Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold);
+ if (recordCount > 0) {
+ rowGroupOrdinal++;
+ parquetFileWriter.startBlock(recordCount);
+ columnStore.flush();
+ pageStore.flushToFileWriter(parquetFileWriter);
+ recordCount = 0;
+ parquetFileWriter.endBlock();
+ this.nextRowGroupSize =
Math.min(parquetFileWriter.getNextRowGroupSize(), rowGroupSizeThreshold);
+ }
+ } finally {
+ AutoCloseables.uncheckedClose(columnStore, pageStore,
bloomFilterWriteStore);
+ columnStore = null;
+ pageStore = null;
+ bloomFilterWriteStore = null;
}
- AutoCloseables.uncheckedClose(columnStore, pageStore,
bloomFilterWriteStore);
- columnStore = null;
- pageStore = null;
- bloomFilterWriteStore = null;
}
long getRowGroupSizeThreshold() {
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
index 97d836aec..d4a77879e 100644
---
a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java
@@ -56,7 +56,11 @@ public class PhoneBookWriter {
+ " }\n"
+ "}\n";
- private static final MessageType schema =
MessageTypeParser.parseMessageType(schemaString);
+ private static final MessageType schema = getSchema();
+
+ public static MessageType getSchema() {
+ return MessageTypeParser.parseMessageType(schemaString);
+ }
public static class Location {
private final Double lon;
diff --git
a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
new file mode 100644
index 000000000..51fa90e1c
--- /dev/null
+++
b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriterError.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+import org.apache.parquet.bytes.TrackingByteBufferAllocator;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.filter2.recordlevel.PhoneBookWriter;
+import org.apache.parquet.hadoop.codec.CleanUtil;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.LocalOutputFile;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * Unit test to check how Parquet writing behaves in case of an error happens
during the writes. We use an OOM because
+ * that is the most tricky to handle. In this case we shall avoid flushing
since it may cause writing to already
+ * released memory spaces.
+ * <p>
+ * To catch the potential issue of writing into released ByteBuffer objects,
direct memory allocation is used and at the
+ * release() call we actually release the related direct memory and zero the
address inside the ByteBuffer object. As a
+ * result, a subsequent read/write call on the related ByteBuffer object will
crash the whole jvm. (Unfortunately, there
+ * is no better way to test this.) To avoid crashing the test executor jvm,
the code of this test is executed in a
+ * separate process.
+ */
+public class TestParquetWriterError {
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Test
+ public void testInSeparateProcess() throws IOException, InterruptedException
{
+ String outputFile = tmpFolder.newFile("out.parquet").toString();
+
+ String classpath = System.getProperty("java.class.path");
+ String javaPath = Paths.get(System.getProperty("java.home"), "bin", "java")
+ .toAbsolutePath()
+ .toString();
+ Process process = new ProcessBuilder()
+ .command(javaPath, "-cp", classpath, Main.class.getName(), outputFile)
+ .redirectError(ProcessBuilder.Redirect.INHERIT)
+ .redirectOutput(ProcessBuilder.Redirect.INHERIT)
+ .start();
+ Assert.assertEquals(
+ "Test process exited with a non-zero return code. See previous logs
for details.",
+ 0,
+ process.waitFor());
+ }
+
+ /**
+ * The class to be used to execute this test in a separate thread.
+ */
+ public static class Main {
+
+ private static final Random RANDOM = new Random(2024_02_27_14_20L);
+
+ // See the release() implementation in createAllocator()
+ private static final Field BUFFER_ADDRESS;
+
+ static {
+ Field bufferAddress;
+ try {
+ Class<?> bufferClass = Class.forName("java.nio.Buffer");
+ bufferAddress = bufferClass.getDeclaredField("address");
+ bufferAddress.setAccessible(true);
+ } catch (Exception e) {
+ // From java 17 it does not work, but we still test on earlier ones,
so we are fine
+ bufferAddress = null;
+ }
+ BUFFER_ADDRESS = bufferAddress;
+ }
+
+ private static Group generateNext() {
+ PhoneBookWriter.Location location;
+ double chance = RANDOM.nextDouble();
+ if (chance < .45) {
+ location = new PhoneBookWriter.Location(RANDOM.nextDouble(),
RANDOM.nextDouble());
+ } else if (chance < .9) {
+ location = new PhoneBookWriter.Location(RANDOM.nextDouble(), null);
+ } else {
+ location = null;
+ }
+ List<PhoneBookWriter.PhoneNumber> phoneNumbers;
+ if (RANDOM.nextDouble() < .1) {
+ phoneNumbers = null;
+ } else {
+ int n = RANDOM.nextInt(4);
+ phoneNumbers = new ArrayList<>(n);
+ for (int i = 0; i < n; ++i) {
+ String kind = RANDOM.nextDouble() < .1 ? null : "kind" +
RANDOM.nextInt(5);
+ phoneNumbers.add(new PhoneBookWriter.PhoneNumber(RANDOM.nextInt(),
kind));
+ }
+ }
+ String name = RANDOM.nextDouble() < .1 ? null : "name" +
RANDOM.nextLong();
+ PhoneBookWriter.User user = new PhoneBookWriter.User(RANDOM.nextLong(),
name, phoneNumbers, location);
+ return PhoneBookWriter.groupFromUser(user);
+ }
+
+ private static TrackingByteBufferAllocator createAllocator(final int
oomAt) {
+ return TrackingByteBufferAllocator.wrap(new DirectByteBufferAllocator() {
+ private int counter = 0;
+
+ @Override
+ public ByteBuffer allocate(int size) {
+ if (++counter >= oomAt) {
+ Assert.assertEquals(
+ "There should not be any additional allocations after an OOM",
oomAt, counter);
+ throw new OutOfMemoryError("Artificial OOM to fail write");
+ }
+ return super.allocate(size);
+ }
+
+ @Override
+ public void release(ByteBuffer b) {
+ CleanUtil.cleanDirectBuffer(b);
+
+ // It seems, if the size of the buffers are small, the related
memory space is not given back to the
+ // OS, so writing to them after release does not cause any
identifiable issue. Therefore, we
+ // try to explicitly zero the address, so the jvm crashes for a
subsequent access.
+ try {
+ if (BUFFER_ADDRESS != null) {
+ BUFFER_ADDRESS.setLong(b, 0L);
+ }
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Unable to zero direct ByteBuffer
address", e);
+ }
+ }
+ });
+ }
+
+ public static void main(String[] args) throws Throwable {
+ // Codecs supported by the direct codec factory by default (without
specific hadoop native libs)
+ CompressionCodecName[] codecs = {
+ CompressionCodecName.UNCOMPRESSED,
+ CompressionCodecName.GZIP,
+ CompressionCodecName.SNAPPY,
+ CompressionCodecName.ZSTD,
+ CompressionCodecName.LZ4_RAW
+ };
+ for (int cycle = 0; cycle < 50; ++cycle) {
+ try (TrackingByteBufferAllocator allocator =
createAllocator(RANDOM.nextInt(100) + 1);
+ ParquetWriter<Group> writer = ExampleParquetWriter.builder(
+ new LocalOutputFile(Paths.get(args[0])))
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .withType(PhoneBookWriter.getSchema())
+ .withAllocator(allocator)
+ .withCodecFactory(CodecFactory.createDirectCodecFactory(
+ new Configuration(), allocator,
ParquetProperties.DEFAULT_PAGE_SIZE))
+ // Also validating the different direct codecs which might
also have issues if an OOM
+ // happens
+ .withCompressionCodec(codecs[RANDOM.nextInt(codecs.length)])
+ .build()) {
+ for (int i = 0; i < 100_000; ++i) {
+ writer.write(generateNext());
+ }
+ Assert.fail("An OOM should have been thrown");
+ } catch (OutOfMemoryError oom) {
+ Throwable[] suppressed = oom.getSuppressed();
+ // No exception should be suppressed after the expected OOM:
+ // It would mean that a close() call fails with an exception
+ if (suppressed != null && suppressed.length > 0) {
+ throw suppressed[0];
+ }
+ }
+ }
+ }
+ }
+}