This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new e815318 Parquet: Fix vectorized reads with dictionary and
non-dictionary row groups (#1388)
e815318 is described below
commit e815318a5378c126e05888c55c7e02955b0946f0
Author: Samarth Jain <[email protected]>
AuthorDate: Fri Aug 28 10:02:52 2020 -0700
Parquet: Fix vectorized reads with dictionary and non-dictionary row groups
(#1388)
---
.../arrow/vectorized/VectorizedArrowReader.java | 11 ++++--
.../java/org/apache/iceberg/parquet/Parquet.java | 23 +++++++++++
...estParquetDictionaryEncodedVectorizedReads.java | 45 ++++++++++++++++++++++
.../vectorized/TestParquetVectorizedReads.java | 2 +-
4 files changed, 76 insertions(+), 5 deletions(-)
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index ee053f2..bb571e0 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -101,7 +101,8 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
LONG,
FLOAT,
DOUBLE,
- TIMESTAMP_MILLIS
+ TIMESTAMP_MILLIS,
+ DICTIONARY
}
@Override
@@ -112,14 +113,15 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
@Override
public VectorHolder read(VectorHolder reuse, int numValsToRead) {
- if (reuse == null) {
-
allocateFieldVector(this.vectorizedColumnIterator.producesDictionaryEncodedVector());
+ boolean dictEncoded =
vectorizedColumnIterator.producesDictionaryEncodedVector();
+ if (reuse == null || (!dictEncoded && readType == ReadType.DICTIONARY) ||
+ (dictEncoded && readType != ReadType.DICTIONARY)) {
+ allocateFieldVector(dictEncoded);
nullabilityHolder = new NullabilityHolder(batchSize);
} else {
vec.setValueCount(0);
nullabilityHolder.reset();
}
- boolean dictEncoded =
vectorizedColumnIterator.producesDictionaryEncodedVector();
if (vectorizedColumnIterator.hasNext()) {
if (dictEncoded) {
vectorizedColumnIterator.nextBatchDictionaryIds((IntVector) vec,
nullabilityHolder);
@@ -176,6 +178,7 @@ public class VectorizedArrowReader implements
VectorizedReader<VectorHolder> {
this.vec = field.createVector(rootAlloc);
((IntVector) vec).allocateNew(batchSize);
this.typeWidth = (int) IntVector.TYPE_WIDTH;
+ this.readType = ReadType.DICTIONARY;
} else {
PrimitiveType primitive = columnDescriptor.getPrimitiveType();
Field arrowField = ArrowSchemaUtil.convert(icebergField);
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 61adf56..1f6d7d5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -19,6 +19,7 @@
package org.apache.iceberg.parquet;
+import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -30,6 +31,7 @@ import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
@@ -690,4 +692,25 @@ public class Parquet {
return new ParquetReadSupport<>(schema, readSupport, callInit,
nameMapping);
}
}
+
+ /**
+ * @param inputFiles an {@link Iterable} of parquet files. The order of
iteration determines the order in which
+ * content of files are read and written to the @param
outputFile
+ * @param outputFile the output parquet file containing all the data from
@param inputFiles
+ * @param rowGroupSize the row group size to use when writing the @param
outputFile
+ * @param schema the schema of the data
+ * @param metadata extraMetadata to write at the footer of the @param
outputFile
+ */
+ public static void concat(Iterable<File> inputFiles, File outputFile, int
rowGroupSize, Schema schema,
+ Map<String, String> metadata) throws IOException {
+ OutputFile file = Files.localOutput(outputFile);
+ ParquetFileWriter writer = new ParquetFileWriter(
+ ParquetIO.file(file), ParquetSchemaUtil.convert(schema, "table"),
+ ParquetFileWriter.Mode.CREATE, rowGroupSize, 0);
+ writer.start();
+ for (File inputFile : inputFiles) {
+ writer.appendFile(ParquetIO.file(Files.localInput(inputFile)));
+ }
+ writer.end(metadata);
+ }
}
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
index 7f2d9c3..bd5c53d 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
@@ -19,13 +19,22 @@
package org.apache.iceberg.spark.data.parquet.vectorized;
+import java.io.File;
import java.io.IOException;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.data.RandomData;
+import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
+import static
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
public class TestParquetDictionaryEncodedVectorizedReads extends
TestParquetVectorizedReads {
@Override
@@ -39,4 +48,40 @@ public class TestParquetDictionaryEncodedVectorizedReads
extends TestParquetVect
public void testVectorizedReadsWithNewContainers() throws IOException {
}
+
+ @Test
+ public void testMixedDictionaryNonDictionaryReads() throws IOException {
+ Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields());
+ File dictionaryEncodedFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", dictionaryEncodedFile.delete());
+ Iterable<GenericData.Record> dictionaryEncodableData =
RandomData.generateDictionaryEncodableData(
+ schema,
+ 10000,
+ 0L,
+ RandomData.DEFAULT_NULL_PERCENTAGE);
+ try (FileAppender<GenericData.Record> writer = getParquetWriter(schema,
dictionaryEncodedFile)) {
+ writer.addAll(dictionaryEncodableData);
+ }
+
+ File plainEncodingFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", plainEncodingFile.delete());
+ Iterable<GenericData.Record> nonDictionaryData =
RandomData.generate(schema, 10000, 0L,
+ RandomData.DEFAULT_NULL_PERCENTAGE);
+ try (FileAppender<GenericData.Record> writer = getParquetWriter(schema,
plainEncodingFile)) {
+ writer.addAll(nonDictionaryData);
+ }
+
+ int rowGroupSize = Integer.parseInt(PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+ File mixedFile = temp.newFile();
+ Assert.assertTrue("Delete should succeed", mixedFile.delete());
+ Parquet.concat(ImmutableList.of(dictionaryEncodedFile, plainEncodingFile,
dictionaryEncodedFile),
+ mixedFile, rowGroupSize, schema, ImmutableMap.of());
+ assertRecordsMatch(
+ schema,
+ 30000,
+ FluentIterable.concat(dictionaryEncodableData, nonDictionaryData,
dictionaryEncodableData),
+ mixedFile,
+ false,
+ true);
+ }
}
diff --git
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index 3e4f5f9..80c6cd1 100644
---
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -91,7 +91,7 @@ public class TestParquetVectorizedReads extends AvroDataTest {
.build();
}
- private void assertRecordsMatch(
+ void assertRecordsMatch(
Schema schema, int expectedSize, Iterable<GenericData.Record> expected,
File testFile,
boolean setAndCheckArrowValidityBuffer, boolean reuseContainers)
throws IOException {