This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push: new fc6f6aee6 ORC-1204. ORC MapReduce writer to flush when long arrays fc6f6aee6 is described below commit fc6f6aee6571df6e53de5adba7cc0b2670c5df91 Author: Owen O'Malley <oomal...@linkedin.com> AuthorDate: Thu Jun 16 11:08:07 2022 -0700 ORC-1204. ORC MapReduce writer to flush when long arrays ### What changes were proposed in this pull request? This adds a new configuration for the row by row writer that sets a limit of how many items will be buffered in the VectorizedRowBatch before it is sent to the ORC Writer. This does not change the behavior of the core writer, where the application already has control over the size of the batch. ### Why are the changes needed? We are getting OOM when writing rows with long arrays with the row-by-row writer. Heap dumps show that it is the ColumnVector inside the array that is taking large amounts of memory. ### How was this patch tested? Updated a couple unit tests and added two more. Closes #1161 from omalley/orc-1204. Authored-by: Owen O'Malley <oomal...@linkedin.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- java/core/src/java/org/apache/orc/OrcConf.java | 6 +- .../apache/orc/mapred/OrcMapredRecordWriter.java | 82 +++++++++++++++++++++- .../org/apache/orc/mapred/OrcOutputFormat.java | 4 +- .../orc/mapreduce/OrcMapreduceRecordWriter.java | 19 ++++- .../org/apache/orc/mapreduce/OrcOutputFormat.java | 5 +- .../org/apache/orc/mapred/TestOrcOutputFormat.java | 22 +----- .../org/apache/orc/mapred/TestOrcRecordWriter.java | 81 +++++++++++++++++++++ .../mapreduce/TestMapreduceOrcOutputFormat.java | 22 +----- .../apache/orc/mapreduce/TestOrcRecordWriter.java | 56 +++++++++++++++ 9 files changed, 252 insertions(+), 45 deletions(-) diff --git a/java/core/src/java/org/apache/orc/OrcConf.java b/java/core/src/java/org/apache/orc/OrcConf.java index aef00fa63..78fdd3006 100644 --- a/java/core/src/java/org/apache/orc/OrcConf.java +++ b/java/core/src/java/org/apache/orc/OrcConf.java @@ -227,7 +227,11 @@ public enum OrcConf { "is the default."), ROW_BATCH_SIZE("orc.row.batch.size", "orc.row.batch.size", 1024, "The number of rows to include in a orc vectorized reader batch. " + - "The value should be carefully chosen to minimize overhead and avoid OOMs in reading data.") + "The value should be carefully chosen to minimize overhead and avoid OOMs in reading data."), + ROW_BATCH_CHILD_LIMIT("orc.row.child.limit", "orc.row.child.limit", + 1024 * 32, "The maximum number of child elements to buffer before "+ + "the ORC row writer writes the batch to the file." + ) ; private final String attribute; diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java index da54f0e9a..6cf7539f1 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordWriter.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector; import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector; @@ -45,10 +46,12 @@ import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcConf; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -61,6 +64,9 @@ public class OrcMapredRecordWriter<V extends Writable> private final VectorizedRowBatch batch; private final TypeDescription schema; private final boolean isTopStruct; + private final List<MultiValuedColumnVector> variableLengthColumns = + new ArrayList<>(); + private final int maxChildLength; public OrcMapredRecordWriter(Writer writer) { this(writer, VectorizedRowBatch.DEFAULT_SIZE); @@ -68,10 +74,71 @@ public class OrcMapredRecordWriter<V extends Writable> public OrcMapredRecordWriter(Writer writer, int rowBatchSize) { + this(writer, rowBatchSize, + (Integer) OrcConf.ROW_BATCH_CHILD_LIMIT.getDefaultValue()); + } + + public OrcMapredRecordWriter(Writer writer, + int rowBatchSize, + int maxChildLength) { this.writer = writer; schema = writer.getSchema(); this.batch = schema.createRowBatch(rowBatchSize); + addVariableLengthColumns(variableLengthColumns, batch); isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT; + this.maxChildLength = maxChildLength; + } + + /** + * Find variable length columns and add them to the list. + * @param result the list to be appended to + * @param vector the column vector to scan + */ + private static + void addVariableLengthColumns(List<MultiValuedColumnVector> result, + ColumnVector vector) { + switch (vector.type) { + case LIST: { + ListColumnVector cv = (ListColumnVector) vector; + result.add(cv); + addVariableLengthColumns(result, cv.child); + break; + } + case MAP: { + MapColumnVector cv = (MapColumnVector) vector; + result.add(cv); + addVariableLengthColumns(result, cv.keys); + addVariableLengthColumns(result, cv.values); + break; + } + case STRUCT: { + for(ColumnVector child: ((StructColumnVector) vector).fields) { + addVariableLengthColumns(result, child); + } + break; + } + case UNION: { + for(ColumnVector child: ((UnionColumnVector) vector).fields) { + addVariableLengthColumns(result, child); + } + break; + } + default: + break; + } + } + + /** + * Find variable length columns and add them to the list. + * @param result the list to be appended to + * @param batch the batch to scan + */ + public static + void addVariableLengthColumns(List<MultiValuedColumnVector> result, + VectorizedRowBatch batch) { + for(ColumnVector cv: batch.cols) { + addVariableLengthColumns(result, cv); + } } static void setLongValue(ColumnVector vector, int row, long value) { @@ -264,10 +331,23 @@ public class OrcMapredRecordWriter<V extends Writable> } } + /** + * Get the longest variable length vector in a column vector + * @return the length of the longest sub-column + */ + public static int getMaxChildLength(List<MultiValuedColumnVector> columns) { + int result = 0; + for(MultiValuedColumnVector cv: columns) { + result = Math.max(result, cv.childCount); + } + return result; + } + @Override public void write(NullWritable nullWritable, V v) throws IOException { // if the batch is full, write it out. - if (batch.size == batch.getMaxSize()) { + if (batch.size == batch.getMaxSize() || + getMaxChildLength(variableLengthColumns) >= maxChildLength) { writer.addRowBatch(batch); batch.reset(); } diff --git a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java index c1b54c173..fc323ab98 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java +++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcOutputFormat.java @@ -75,6 +75,8 @@ public class OrcOutputFormat<V extends Writable> Path path = getTaskOutputPath(conf, name); Writer writer = OrcFile.createWriter(path, buildOptions(conf).fileSystem(fileSystem)); - return new OrcMapredRecordWriter<>(writer); + return new OrcMapredRecordWriter<>(writer, + OrcConf.ROW_BATCH_SIZE.getInt(conf), + OrcConf.ROW_BATCH_CHILD_LIMIT.getInt(conf)); } } diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java index f04b98208..3cd671ec1 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java +++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordWriter.java @@ -18,11 +18,13 @@ package org.apache.orc.mapreduce; +import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.orc.OrcConf; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; import org.apache.orc.mapred.OrcKey; @@ -31,6 +33,8 @@ import org.apache.orc.mapred.OrcStruct; import org.apache.orc.mapred.OrcValue; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; public class OrcMapreduceRecordWriter<V extends Writable> extends RecordWriter<NullWritable, V> { @@ -39,6 +43,9 @@ public class OrcMapreduceRecordWriter<V extends Writable> private final VectorizedRowBatch batch; private final TypeDescription schema; private final boolean isTopStruct; + private final List<MultiValuedColumnVector> variableLengthColumns = + new ArrayList<>(); + private final int maxChildLength; public OrcMapreduceRecordWriter(Writer writer) { this(writer, VectorizedRowBatch.DEFAULT_SIZE); @@ -46,16 +53,26 @@ public class OrcMapreduceRecordWriter<V extends Writable> public OrcMapreduceRecordWriter(Writer writer, int rowBatchSize) { + this(writer, rowBatchSize, + (Integer) OrcConf.ROW_BATCH_CHILD_LIMIT.getDefaultValue()); + } + + public OrcMapreduceRecordWriter(Writer writer, + int rowBatchSize, + int maxChildLength) { this.writer = writer; schema = writer.getSchema(); this.batch = schema.createRowBatch(rowBatchSize); isTopStruct = schema.getCategory() == TypeDescription.Category.STRUCT; + OrcMapredRecordWriter.addVariableLengthColumns(variableLengthColumns, batch); + this.maxChildLength = maxChildLength; } @Override public void write(NullWritable nullWritable, V v) throws IOException { // if the batch is full, write it out. - if (batch.size == batch.getMaxSize()) { + if (batch.size == batch.getMaxSize() || + OrcMapredRecordWriter.getMaxChildLength(variableLengthColumns) >= maxChildLength) { writer.addRowBatch(batch); batch.reset(); } diff --git a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java index a0f36cd7d..87dd9a83a 100644 --- a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java +++ b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcOutputFormat.java @@ -25,6 +25,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.orc.OrcConf; import org.apache.orc.OrcFile; import org.apache.orc.Writer; @@ -49,7 +50,9 @@ public class OrcOutputFormat<V extends Writable> Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION); Writer writer = OrcFile.createWriter(filename, org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf)); - return new OrcMapreduceRecordWriter<V>(writer); + return new OrcMapreduceRecordWriter<V>(writer, + OrcConf.ROW_BATCH_SIZE.getInt(conf), + OrcConf.ROW_BATCH_CHILD_LIMIT.getInt(conf)); } @Override diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java index 64799177a..a95d73a3f 100644 --- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java @@ -310,14 +310,14 @@ public class TestOrcOutputFormat { public void testOrcOutputFormatWithRowBatchSize() throws Exception { conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString()); conf.set("mapreduce.task.attempt.id", "attempt_jt0_0_m_0_0"); - conf.setInt(OrcOutputFormatWithRowBatchSize.ROW_BATCH_SIZE, 128); + OrcConf.ROW_BATCH_SIZE.setInt(conf, 128); String TYPE_STRING = "struct<i:int,s:string>"; OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING); conf.setOutputCommitter(NullOutputCommitter.class); TypeDescription schema = TypeDescription.fromString(TYPE_STRING); OrcKey key = new OrcKey(new OrcStruct(schema)); RecordWriter<NullWritable, Writable> writer = - new OrcOutputFormatWithRowBatchSize<>().getRecordWriter(fs, conf, "key.orc", + new OrcOutputFormat<>().getRecordWriter(fs, conf, "key.orc", Reporter.NULL); NullWritable nada = NullWritable.get(); for(int r=0; r < 2000; ++r) { @@ -332,22 +332,4 @@ public class TestOrcOutputFormat { assertEquals(2000, file.getNumberOfRows()); assertEquals(TYPE_STRING, file.getSchema().toString()); } - - private static class OrcOutputFormatWithRowBatchSize<V extends Writable> extends OrcOutputFormat { - public static final String ROW_BATCH_SIZE = OrcConf.ROW_BATCH_SIZE.getAttribute(); - - @Override - public RecordWriter<NullWritable, V> getRecordWriter(FileSystem fileSystem, - JobConf conf, - String name, - Progressable progressable - ) throws IOException { - Path path = getTaskOutputPath(conf, name); - Writer writer = OrcFile.createWriter(path, - buildOptions(conf).fileSystem(fileSystem)); - //Ensure that orc.row.batch.size config is set in the JobConf - int rowBatchSize = Integer.parseInt(conf.get(ROW_BATCH_SIZE)); - return new OrcMapredRecordWriter<>(writer, rowBatchSize); - } - } } diff --git a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcRecordWriter.java b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcRecordWriter.java new file mode 100644 index 000000000..8efe48559 --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcRecordWriter.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapred; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MultiValuedColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.io.IntWritable; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; + +public class TestOrcRecordWriter { + + /** + * Test finding the multi-value columns. + */ + @Test + public void testFindingMultiValueColumns() { + // Make sure that we find all the multi-value columns from a batch. + TypeDescription schema = TypeDescription.fromString("struct<x:struct<" + + "x:uniontype<int,array<array<int>>,map<array<int>,array<int>>>>>"); + VectorizedRowBatch batch = schema.createRowBatchV2(); + List<MultiValuedColumnVector> result = new ArrayList<>(); + OrcMapredRecordWriter.addVariableLengthColumns(result, batch); + assertEquals(5, result.size()); + assertEquals(ColumnVector.Type.LIST, result.get(0).type); + assertEquals(ColumnVector.Type.LIST, result.get(1).type); + assertEquals(ColumnVector.Type.MAP, result.get(2).type); + assertEquals(ColumnVector.Type.LIST, result.get(3).type); + assertEquals(ColumnVector.Type.LIST, result.get(4).type); + } + + /** + * Test the child element limit flushes the writer. + */ + @Test + public void testChildElementLimit() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct<x:array<int>>"); + Writer mockWriter = Mockito.mock(Writer.class); + Mockito.when(mockWriter.getSchema()).thenReturn(schema); + OrcMapredRecordWriter<OrcStruct> recordWriter = + new OrcMapredRecordWriter<>(mockWriter, 1024, 10); + OrcStruct record = new OrcStruct(schema); + OrcList list = new OrcList(schema.getChildren().get(0)); + record.setFieldValue(0, list); + list.add(new IntWritable(1)); + list.add(new IntWritable(2)); + Mockito.verify(mockWriter, times(0)).addRowBatch(any()); + for(int i=0; i < 11; i++) { + recordWriter.write(null, record); + } + // We've written 11 rows with 2 integers each, so we should have written + // 2 batches of 5 rows. + Mockito.verify(mockWriter, times(2)).addRowBatch(any()); + } +} diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java index 897bce668..e526b8630 100644 --- a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java +++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java @@ -295,7 +295,7 @@ public class TestMapreduceOrcOutputFormat { @Test public void testOrcOutputFormatWithRowBatchSize() throws Exception { conf.set("mapreduce.output.fileoutputformat.outputdir", workDir.toString()); - conf.setInt(OrcOutputFormatWithRowBatchSize.ROW_BATCH_SIZE, 128); + OrcConf.ROW_BATCH_SIZE.setInt(conf, 128); String TYPE_STRING = "struct<i:int,s:string>"; OrcConf.MAPRED_OUTPUT_SCHEMA.setString(conf, TYPE_STRING); conf.setBoolean(OrcOutputFormat.SKIP_TEMP_DIRECTORY, true); @@ -304,7 +304,7 @@ public class TestMapreduceOrcOutputFormat { TypeDescription schema = TypeDescription.fromString(TYPE_STRING); OrcKey key = new OrcKey(new OrcStruct(schema)); RecordWriter<NullWritable, Writable> writer = - new OrcOutputFormatWithRowBatchSize<>().getRecordWriter(attemptContext); + new OrcOutputFormat<>().getRecordWriter(attemptContext); NullWritable nada = NullWritable.get(); for(int r=0; r < 2000; ++r) { ((OrcStruct) key.key).setAllFields(new IntWritable(r), @@ -318,22 +318,4 @@ public class TestMapreduceOrcOutputFormat { assertEquals(2000, file.getNumberOfRows()); assertEquals(TYPE_STRING, file.getSchema().toString()); } - - private static class OrcOutputFormatWithRowBatchSize<V extends Writable> extends OrcOutputFormat { - private static final String EXTENSION = ".orc"; - public static final String ROW_BATCH_SIZE = OrcConf.ROW_BATCH_SIZE.getAttribute(); - - @Override - public RecordWriter<NullWritable, V> - getRecordWriter(TaskAttemptContext taskAttemptContext - ) throws IOException { - Configuration conf = taskAttemptContext.getConfiguration(); - Path filename = getDefaultWorkFile(taskAttemptContext, EXTENSION); - Writer writer = OrcFile.createWriter(filename, - org.apache.orc.mapred.OrcOutputFormat.buildOptions(conf)); - //Ensure that orc.row.batch.size confing is set in the JobConf - int rowBatchSize = Integer.parseInt(conf.get(ROW_BATCH_SIZE)); - return new OrcMapreduceRecordWriter<>(writer, rowBatchSize); - } - } } diff --git a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestOrcRecordWriter.java b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestOrcRecordWriter.java new file mode 100644 index 000000000..2dd5e01a5 --- /dev/null +++ b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestOrcRecordWriter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.mapreduce; + +import org.apache.hadoop.io.IntWritable; +import org.apache.orc.TypeDescription; +import org.apache.orc.Writer; +import org.apache.orc.mapred.OrcList; +import org.apache.orc.mapred.OrcStruct; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; + +public class TestOrcRecordWriter { + /** + * Test the child element limit flushes the writer. + */ + @Test + public void testChildElementLimit() throws Exception { + TypeDescription schema = TypeDescription.fromString("struct<x:array<int>>"); + Writer mockWriter = Mockito.mock(Writer.class); + Mockito.when(mockWriter.getSchema()).thenReturn(schema); + OrcMapreduceRecordWriter<OrcStruct> recordWriter = + new OrcMapreduceRecordWriter<>(mockWriter, 1024, 10); + OrcStruct record = new OrcStruct(schema); + OrcList list = new OrcList(schema.getChildren().get(0)); + record.setFieldValue(0, list); + list.add(new IntWritable(1)); + list.add(new IntWritable(2)); + Mockito.verify(mockWriter, times(0)).addRowBatch(any()); + for(int i=0; i < 11; i++) { + recordWriter.write(null, record); + } + // We've written 11 rows with 2 integers each, so we should have written + // 2 batches of 5 rows. + Mockito.verify(mockWriter, times(2)).addRowBatch(any()); + } +}