This is an automated email from the ASF dual-hosted git repository.
gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 0d09be6 PARQUET-1920: Fix Parquet writer's memory check interval
calculation (#824)
0d09be6 is described below
commit 0d09be68c289a56e9e547949a3d1ee6df10ef794
Author: Ashish Singh <[email protected]>
AuthorDate: Mon Oct 12 01:22:32 2020 -0700
PARQUET-1920: Fix Parquet writer's memory check interval calculation (#824)
Fix Parquet writer's memory check interval calculation, and throw helpful
message while dealing with too large column chunks.
---
.../src/main/java/org/apache/parquet/bytes/BytesInput.java | 13 ++++++++++---
.../apache/parquet/hadoop/InternalParquetRecordWriter.java | 14 +++++++-------
2 files changed, 17 insertions(+), 10 deletions(-)
diff --git
a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
index 807fd06..5d4efb5 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -215,6 +215,13 @@ abstract public class BytesInput {
* @throws IOException if there is an exception reading
*/
public byte[] toByteArray() throws IOException {
+ long size = size();
+ if (size > Integer.MAX_VALUE) {
+ throw new IOException("Page size, " + size + ", is larger than allowed "
+ Integer.MAX_VALUE + "." +
+ " Usually caused by a Parquet writer writing too big column chunks on
encountering highly skewed dataset." +
+ " Please set page.size.row.check.max to a lower value on the writer,
default value is 10000." +
+ " You can try setting it to " + (10000 / (size / Integer.MAX_VALUE)) +
" or lower.");
+ }
BAOS baos = new BAOS((int)size());
this.writeAllTo(baos);
LOG.debug("converted {} to byteArray of {} bytes", size() , baos.size());
diff --git
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 0ecdabf..d2e505b 100644
---
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -44,9 +44,6 @@ import org.slf4j.LoggerFactory;
class InternalParquetRecordWriter<T> {
private static final Logger LOG =
LoggerFactory.getLogger(InternalParquetRecordWriter.class);
- private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
- private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
-
private final ParquetFileWriter parquetFileWriter;
private final WriteSupport<T> writeSupport;
private final MessageType schema;
@@ -61,7 +58,7 @@ class InternalParquetRecordWriter<T> {
private boolean closed;
private long recordCount = 0;
- private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+ private long recordCountForNextMemCheck;
private long lastRowGroupEndPos = 0;
private ColumnWriteStore columnStore;
@@ -102,6 +99,7 @@ class InternalParquetRecordWriter<T> {
this.fileEncryptor = parquetFileWriter.getEncryptor();
this.rowGroupOrdinal = 0;
initStore();
+ recordCountForNextMemCheck = props.getMinRowCountForPageSizeCheck();
}
public ParquetMetadata getFooter() {
@@ -159,12 +157,14 @@ class InternalParquetRecordWriter<T> {
LOG.debug("mem size {} > {}: flushing {} records to disk.", memSize,
nextRowGroupSize, recordCount);
flushRowGroupToStore();
initStore();
- recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK,
recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
+ recordCountForNextMemCheck =
min(max(props.getMinRowCountForPageSizeCheck(), recordCount / 2),
+ props.getMaxRowCountForPageSizeCheck());
this.lastRowGroupEndPos = parquetFileWriter.getPos();
} else {
recordCountForNextMemCheck = min(
- max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount +
(long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
- recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more
than max records ahead
+ max(props.getMinRowCountForPageSizeCheck(),
+ (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) /
2), // will check halfway
+ recordCount + props.getMaxRowCountForPageSizeCheck() // will not
look more than max records ahead
);
LOG.debug("Checked mem at {} will check again at: {}", recordCount,
recordCountForNextMemCheck);
}