This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new e815318  Parquet: Fix vectorized reads with dictionary and 
non-dictionary row groups (#1388)
e815318 is described below

commit e815318a5378c126e05888c55c7e02955b0946f0
Author: Samarth Jain <[email protected]>
AuthorDate: Fri Aug 28 10:02:52 2020 -0700

    Parquet: Fix vectorized reads with dictionary and non-dictionary row groups 
(#1388)
---
 .../arrow/vectorized/VectorizedArrowReader.java    | 11 ++++--
 .../java/org/apache/iceberg/parquet/Parquet.java   | 23 +++++++++++
 ...estParquetDictionaryEncodedVectorizedReads.java | 45 ++++++++++++++++++++++
 .../vectorized/TestParquetVectorizedReads.java     |  2 +-
 4 files changed, 76 insertions(+), 5 deletions(-)

diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index ee053f2..bb571e0 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -101,7 +101,8 @@ public class VectorizedArrowReader implements 
VectorizedReader<VectorHolder> {
     LONG,
     FLOAT,
     DOUBLE,
-    TIMESTAMP_MILLIS
+    TIMESTAMP_MILLIS,
+    DICTIONARY
   }
 
   @Override
@@ -112,14 +113,15 @@ public class VectorizedArrowReader implements 
VectorizedReader<VectorHolder> {
 
   @Override
   public VectorHolder read(VectorHolder reuse, int numValsToRead) {
-    if (reuse == null) {
-      
allocateFieldVector(this.vectorizedColumnIterator.producesDictionaryEncodedVector());
+    boolean dictEncoded = 
vectorizedColumnIterator.producesDictionaryEncodedVector();
+    if (reuse == null || (!dictEncoded && readType == ReadType.DICTIONARY) ||
+        (dictEncoded && readType != ReadType.DICTIONARY)) {
+      allocateFieldVector(dictEncoded);
       nullabilityHolder = new NullabilityHolder(batchSize);
     } else {
       vec.setValueCount(0);
       nullabilityHolder.reset();
     }
-    boolean dictEncoded = 
vectorizedColumnIterator.producesDictionaryEncodedVector();
     if (vectorizedColumnIterator.hasNext()) {
       if (dictEncoded) {
         vectorizedColumnIterator.nextBatchDictionaryIds((IntVector) vec, 
nullabilityHolder);
@@ -176,6 +178,7 @@ public class VectorizedArrowReader implements 
VectorizedReader<VectorHolder> {
       this.vec = field.createVector(rootAlloc);
       ((IntVector) vec).allocateNew(batchSize);
       this.typeWidth = (int) IntVector.TYPE_WIDTH;
+      this.readType = ReadType.DICTIONARY;
     } else {
       PrimitiveType primitive = columnDescriptor.getPrimitiveType();
       Field arrowField = ArrowSchemaUtil.convert(icebergField);
diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java 
b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
index 61adf56..1f6d7d5 100644
--- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
+++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
@@ -19,6 +19,7 @@
 
 package org.apache.iceberg.parquet;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -30,6 +31,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.MetricsConfig;
 import org.apache.iceberg.PartitionSpec;
@@ -690,4 +692,25 @@ public class Parquet {
       return new ParquetReadSupport<>(schema, readSupport, callInit, 
nameMapping);
     }
   }
+
+  /**
+   * @param inputFiles   an {@link Iterable} of parquet files. The order of 
iteration determines the order in which
+   *                     content of files are read and written to the @param 
outputFile
+   * @param outputFile   the output parquet file containing all the data from 
@param inputFiles
+   * @param rowGroupSize the row group size to use when writing the @param 
outputFile
+   * @param schema       the schema of the data
+   * @param metadata     extraMetadata to write at the footer of the @param 
outputFile
+   */
+  public static void concat(Iterable<File> inputFiles, File outputFile, int 
rowGroupSize, Schema schema,
+                            Map<String, String> metadata) throws IOException {
+    OutputFile file = Files.localOutput(outputFile);
+    ParquetFileWriter writer = new ParquetFileWriter(
+            ParquetIO.file(file), ParquetSchemaUtil.convert(schema, "table"),
+            ParquetFileWriter.Mode.CREATE, rowGroupSize, 0);
+    writer.start();
+    for (File inputFile : inputFiles) {
+      writer.appendFile(ParquetIO.file(Files.localInput(inputFile)));
+    }
+    writer.end(metadata);
+  }
 }
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
 
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
index 7f2d9c3..bd5c53d 100644
--- 
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetDictionaryEncodedVectorizedReads.java
@@ -19,13 +19,22 @@
 
 package org.apache.iceberg.spark.data.parquet.vectorized;
 
+import java.io.File;
 import java.io.IOException;
 import org.apache.avro.generic.GenericData;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.spark.data.RandomData;
+import org.junit.Assert;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import static 
org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT;
+
 public class TestParquetDictionaryEncodedVectorizedReads extends 
TestParquetVectorizedReads {
 
   @Override
@@ -39,4 +48,40 @@ public class TestParquetDictionaryEncodedVectorizedReads 
extends TestParquetVect
   public void testVectorizedReadsWithNewContainers() throws IOException {
 
   }
+
+  @Test
+  public void testMixedDictionaryNonDictionaryReads() throws IOException {
+    Schema schema = new Schema(SUPPORTED_PRIMITIVES.fields());
+    File dictionaryEncodedFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", dictionaryEncodedFile.delete());
+    Iterable<GenericData.Record> dictionaryEncodableData = 
RandomData.generateDictionaryEncodableData(
+            schema,
+            10000,
+            0L,
+            RandomData.DEFAULT_NULL_PERCENTAGE);
+    try (FileAppender<GenericData.Record> writer = getParquetWriter(schema, 
dictionaryEncodedFile)) {
+      writer.addAll(dictionaryEncodableData);
+    }
+
+    File plainEncodingFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", plainEncodingFile.delete());
+    Iterable<GenericData.Record> nonDictionaryData = 
RandomData.generate(schema, 10000, 0L,
+            RandomData.DEFAULT_NULL_PERCENTAGE);
+    try (FileAppender<GenericData.Record> writer = getParquetWriter(schema, 
plainEncodingFile)) {
+      writer.addAll(nonDictionaryData);
+    }
+
+    int rowGroupSize = Integer.parseInt(PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT);
+    File mixedFile = temp.newFile();
+    Assert.assertTrue("Delete should succeed", mixedFile.delete());
+    Parquet.concat(ImmutableList.of(dictionaryEncodedFile, plainEncodingFile, 
dictionaryEncodedFile),
+            mixedFile, rowGroupSize, schema, ImmutableMap.of());
+    assertRecordsMatch(
+            schema,
+            30000,
+            FluentIterable.concat(dictionaryEncodableData, nonDictionaryData, 
dictionaryEncodableData),
+            mixedFile,
+            false,
+            true);
+  }
 }
diff --git 
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
 
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
index 3e4f5f9..80c6cd1 100644
--- 
a/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
+++ 
b/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java
@@ -91,7 +91,7 @@ public class TestParquetVectorizedReads extends AvroDataTest {
         .build();
   }
 
-  private void assertRecordsMatch(
+  void assertRecordsMatch(
       Schema schema, int expectedSize, Iterable<GenericData.Record> expected, 
File testFile,
       boolean setAndCheckArrowValidityBuffer, boolean reuseContainers)
       throws IOException {

Reply via email to