This is an automated email from the ASF dual-hosted git repository.

gabor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d09be6  PARQUET-1920: Fix Parquet writer's memory check interval 
calculation (#824)
0d09be6 is described below

commit 0d09be68c289a56e9e547949a3d1ee6df10ef794
Author: Ashish Singh <[email protected]>
AuthorDate: Mon Oct 12 01:22:32 2020 -0700

    PARQUET-1920: Fix Parquet writer's memory check interval calculation (#824)
    
    Fix Parquet writer's memory check interval calculation, and throw helpful 
message while dealing with too large column chunks.
---
 .../src/main/java/org/apache/parquet/bytes/BytesInput.java | 13 ++++++++++---
 .../apache/parquet/hadoop/InternalParquetRecordWriter.java | 14 +++++++-------
 2 files changed, 17 insertions(+), 10 deletions(-)

diff --git 
a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java 
b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
index 807fd06..5d4efb5 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -215,6 +215,13 @@ abstract public class BytesInput {
    * @throws IOException if there is an exception reading
    */
   public byte[] toByteArray() throws IOException {
+    long size = size();
+    if (size > Integer.MAX_VALUE) {
+      throw new IOException("Page size, " + size + ", is larger than allowed " 
+ Integer.MAX_VALUE + "." +
+        " Usually caused by a Parquet writer writing too big column chunks on 
encountering highly skewed dataset." +
+        " Please set page.size.row.check.max to a lower value on the writer, 
default value is 10000." +
+        " You can try setting it to " + (10000 / (size / Integer.MAX_VALUE)) + 
" or lower.");
+    }
     BAOS baos = new BAOS((int)size());
     this.writeAllTo(baos);
     LOG.debug("converted {} to byteArray of {} bytes", size() , baos.size());
diff --git 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index 0ecdabf..d2e505b 100644
--- 
a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ 
b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -44,9 +44,6 @@ import org.slf4j.LoggerFactory;
 class InternalParquetRecordWriter<T> {
   private static final Logger LOG = 
LoggerFactory.getLogger(InternalParquetRecordWriter.class);
 
-  private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
-  private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
-
   private final ParquetFileWriter parquetFileWriter;
   private final WriteSupport<T> writeSupport;
   private final MessageType schema;
@@ -61,7 +58,7 @@ class InternalParquetRecordWriter<T> {
   private boolean closed;
 
   private long recordCount = 0;
-  private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
+  private long recordCountForNextMemCheck;
   private long lastRowGroupEndPos = 0;
 
   private ColumnWriteStore columnStore;
@@ -102,6 +99,7 @@ class InternalParquetRecordWriter<T> {
     this.fileEncryptor = parquetFileWriter.getEncryptor();
     this.rowGroupOrdinal = 0;
     initStore();
+    recordCountForNextMemCheck = props.getMinRowCountForPageSizeCheck();
   }
 
   public ParquetMetadata getFooter() {
@@ -159,12 +157,14 @@ class InternalParquetRecordWriter<T> {
         LOG.debug("mem size {} > {}: flushing {} records to disk.", memSize, 
nextRowGroupSize, recordCount);
         flushRowGroupToStore();
         initStore();
-        recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, 
recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
+        recordCountForNextMemCheck = 
min(max(props.getMinRowCountForPageSizeCheck(), recordCount / 2),
+          props.getMaxRowCountForPageSizeCheck());
         this.lastRowGroupEndPos = parquetFileWriter.getPos();
       } else {
         recordCountForNextMemCheck = min(
-            max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + 
(long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
-            recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more 
than max records ahead
+            max(props.getMinRowCountForPageSizeCheck(),
+              (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 
2), // will check halfway
+            recordCount + props.getMaxRowCountForPageSizeCheck() // will not 
look more than max records ahead
             );
         LOG.debug("Checked mem at {} will check again at: {}", recordCount, 
recordCountForNextMemCheck);
       }

Reply via email to