rymurr commented on a change in pull request #2286:
URL: https://github.com/apache/iceberg/pull/2286#discussion_r604137792



##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedParquetReaders.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Builds an {@link ArrowBatchReader}.
+ */
+class VectorizedParquetReaders {
+
+  private VectorizedParquetReaders() {
+  }
+
+  /**
+   * Build the {@link ArrowBatchReader} for the expected schema and file 
schema.
+   *
+   * @param expectedSchema         Expected schema of the data returned.
+   * @param fileSchema             Schema of the data file.
+   * @param setArrowValidityVector Indicates whether to set the validity 
vector in Arrow vectors.
+   */
+  public static ArrowBatchReader buildReader(

Review comment:
       could this live in the iterator? I am not sure it needs its own static 
class for this.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatch.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.util.Arrays;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+
+/**
+ * This class is inspired by Spark's {@code ColumnarBatch}.
+ * This class wraps a columnar batch in the result set of an Iceberg table 
query.
+ */
+public class ArrowBatch implements AutoCloseable {

Review comment:
       nit: could we stick w/ `ColumnBatch` here and elsewhere. Just to hide 
from non-experts the Arrow-ness of the vectorized reader

##########
File path: 
arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
##########
@@ -0,0 +1,734 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link ArrowReader}.
+ * <p>All tests create a table with monthly partitions and write 1 year of 
data to the table.
+ */
+public class ArrowReaderTest {
+
+  private static final int NUM_ROWS_PER_MONTH = 20;
+  private static final ImmutableList<String> ALL_COLUMNS =
+      ImmutableList.of(
+          "timestamp",
+          "timestamp_nullable",
+          "boolean",
+          "boolean_nullable",
+          "int",
+          "int_nullable",
+          "long",
+          "long_nullable",
+          "float",
+          "float_nullable",
+          "double",
+          "double_nullable",
+          "timestamp_tz",
+          "timestamp_tz_nullable",
+          "string",
+          "string_nullable",
+          "bytes",
+          "bytes_nullable",
+          "date",
+          "date_nullable",
+          "int_promotion"
+      );
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  private HadoopTables tables;
+
+  private String tableLocation;
+  private List<GenericRecord> rowsWritten;
+
+  /**
+   * Read all rows and columns from the table without any filter. The test 
asserts that the Arrow {@link
+   * VectorSchemaRoot} contains the expected schema and expected vector types. 
Then the test asserts that the vectors
+   * contains expected values. The test also asserts the total number of rows 
match the expected value.
+   */
+  @Test
+  public void testReadAll() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    readAndCheckVectorSchemaRoots(table.newScan(), NUM_ROWS_PER_MONTH, 12 * 
NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * This test writes each partition with constant value rows. The Arrow 
vectors returned are mostly of type int32
+   * which is unexpected. This is happening because of dictionary encoding at 
the storage level.
+   * <p>
+   * Following are the expected and actual Arrow schema:
+   * <pre>
+   * Expected Arrow Schema:
+   * timestamp: Timestamp(MICROSECOND, null) not null,
+   * timestamp_nullable: Timestamp(MICROSECOND, null),
+   * boolean: Bool not null,
+   * boolean_nullable: Bool,
+   * int: Int(32, true) not null,
+   * int_nullable: Int(32, true),
+   * long: Int(64, true) not null,
+   * long_nullable: Int(64, true),
+   * float: FloatingPoint(SINGLE) not null,
+   * float_nullable: FloatingPoint(SINGLE),
+   * double: FloatingPoint(DOUBLE) not null,
+   * double_nullable: FloatingPoint(DOUBLE),
+   * timestamp_tz: Timestamp(MICROSECOND, UTC) not null,
+   * timestamp_tz_nullable: Timestamp(MICROSECOND, UTC),
+   * string: Utf8 not null,
+   * string_nullable: Utf8,
+   * bytes: Binary not null,
+   * bytes_nullable: Binary,
+   * date: Date(DAY) not null,
+   * date_nullable: Date(DAY),
+   * int_promotion: Int(32, true) not null
+   *
+   * Actual Arrow Schema:
+   * timestamp: Int(32, true) not null,
+   * timestamp_nullable: Int(32, true),
+   * boolean: Bool not null,
+   * boolean_nullable: Bool,
+   * int: Int(32, true) not null,
+   * int_nullable: Int(32, true),
+   * long: Int(32, true) not null,
+   * long_nullable: Int(32, true),
+   * float: Int(32, true) not null,
+   * float_nullable: Int(32, true),
+   * double: Int(32, true) not null,
+   * double_nullable: Int(32, true),
+   * timestamp_tz: Int(32, true) not null,
+   * timestamp_tz_nullable: Int(32, true),
+   * string: Int(32, true) not null,
+   * string_nullable: Int(32, true),
+   * bytes: Int(32, true) not null,
+   * bytes_nullable: Int(32, true),
+   * date: Date(DAY) not null,
+   * date_nullable: Date(DAY),
+   * int_promotion: Int(32, true) not null
+   * </pre>
+   * <p>
+   * TODO: fix the returned Arrow vectors to have vector types consistent with 
Iceberg types.
+   * <p>
+   * Read all rows and columns from the table without any filter. The test 
asserts that the Arrow {@link
+   * VectorSchemaRoot} contains the expected schema and expected vector types. 
Then the test asserts that the vectors
+   * contains expected values. The test also asserts the total number of rows 
match the expected value.
+   */
+  @Test
+  @Ignore
+  public void testReadAllWithConstantRecords() throws Exception {
+    writeTableWithConstantRecords();
+    Table table = tables.load(tableLocation);
+    readAndCheckVectorSchemaRoots(table.newScan(), NUM_ROWS_PER_MONTH, 12 * 
NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * Read all rows and columns from the table without any filter. The test 
uses a batch size smaller than the number of
+   * rows in a partition. The test asserts that the Arrow {@link 
VectorSchemaRoot} contains the expected schema and
+   * expected vector types. Then the test asserts that the vectors contains 
expected values. The test also asserts the
+   * total number of rows match the expected value.
+   */
+  @Test
+  public void testReadAllWithSmallerBatchSize() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan();
+    readAndCheckVectorSchemaRoots(scan, 10, 12 * NUM_ROWS_PER_MONTH, 
ALL_COLUMNS);
+  }
+
+  /**
+   * Read selected rows and all columns from the table using a time range row 
filter. The test asserts that the Arrow
+   * {@link VectorSchemaRoot} contains the expected schema and expected vector 
types. Then the test asserts that the
+   * vectors contains expected values. The test also asserts the total number 
of rows match the expected value.
+   */
+  @Test
+  public void testReadRangeFilter() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    LocalDateTime beginTime = LocalDateTime.of(2020, 1, 1, 0, 0, 0);
+    LocalDateTime endTime = LocalDateTime.of(2020, 2, 1, 0, 0, 0);
+    TableScan scan = table.newScan()
+        .filter(Expressions.and(
+            Expressions.greaterThanOrEqual("timestamp", 
timestampToMicros(beginTime)),
+            Expressions.lessThan("timestamp", timestampToMicros(endTime))));
+    readAndCheckVectorSchemaRoots(scan, NUM_ROWS_PER_MONTH, 
NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * Read selected rows and all columns from the table using a time range row 
filter.
+   * The test asserts that the result is empty.
+   */
+  @Test
+  public void testReadRangeFilterEmptyResult() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    LocalDateTime beginTime = LocalDateTime.of(2021, 1, 1, 0, 0, 0);
+    LocalDateTime endTime = LocalDateTime.of(2021, 2, 1, 0, 0, 0);
+    TableScan scan = table.newScan()
+            .filter(Expressions.and(
+                    Expressions.greaterThanOrEqual("timestamp", 
timestampToMicros(beginTime)),
+                    Expressions.lessThan("timestamp", 
timestampToMicros(endTime))));
+    int numRoots = 0;
+    for (ArrowBatch batch : new VectorizedTableScanIterable(scan, 
NUM_ROWS_PER_MONTH, false)) {
+      numRoots++;
+    }
+    assertEquals(0, numRoots);
+  }
+
+  /**
+   * Read all rows and selected columns from the table with a column selection 
filter. The test asserts that the Arrow
+   * {@link VectorSchemaRoot} contains the expected schema and expected vector 
types. Then the test asserts that the
+   * vectors contains expected values. The test also asserts the total number 
of rows match the expected value.
+   */
+  @Test
+  public void testReadColumnFilter1() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan()
+        .select("timestamp", "int", "string");
+    readAndCheckVectorSchemaRoots(
+        scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH,
+        ImmutableList.of("timestamp", "int", "string"));
+  }
+
+  /**
+   * Read all rows and a single column from the table with a column selection 
filter. The test asserts that the Arrow
+   * {@link VectorSchemaRoot} contains the expected schema and expected vector 
types. Then the test asserts that the
+   * vectors contains expected values. The test also asserts the total number 
of rows match the expected value.
+   */
+  @Test
+  public void testReadColumnFilter2() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan()
+        .select("timestamp");
+    readAndCheckVectorSchemaRoots(
+        scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH,
+        ImmutableList.of("timestamp"));
+  }
+
+  private void readAndCheckVectorSchemaRoots(
+      TableScan scan,
+      int numRowsPerRoot,
+      int expectedTotalRows,
+      List<String> columns) {
+    Set<String> columnSet = ImmutableSet.copyOf(columns);
+    int rowIndex = 0;
+    int totalRows = 0;
+    for (ArrowBatch batch : new VectorizedTableScanIterable(scan, 
numRowsPerRoot, false)) {

Review comment:
       should this be in a try resource block? `VectorizedTableScanIterable` is 
never closed right?

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedCombinedScanIterator.java
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+ * Only Parquet data file format is supported.
+ */
+class VectorizedCombinedScanIterator implements CloseableIterator<ArrowBatch> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(VectorizedCombinedScanIterator.class);
+
+  private final Iterator<FileScanTask> tasks;
+  private final Map<String, InputFile> inputFiles;
+  private final Schema expectedSchema;
+  private final String nameMapping;
+  private final boolean caseSensitive;
+  private final int batchSize;
+  private final boolean reuseContainers;
+  private CloseableIterator<ArrowBatch> currentIterator;
+  private ArrowBatch current;
+  private FileScanTask currentTask;
+
+  /**
+   * Create a new instance.
+   *
+   * @param task              Combined file scan task.
+   * @param expectedSchema    Read schema. The returned data will have this 
schema.
+   * @param nameMapping       Mapping from external schema names to Iceberg 
type IDs.
+   * @param io                File I/O.
+   * @param encryptionManager Encryption manager.
+   * @param caseSensitive     If {@code true}, column names are case sensitive.
+   *                          If {@code false}, column names are not case 
sensitive.
+   * @param batchSize         Batch size in number of rows. Each Arrow batch 
contains
+   *                          a maximum of {@code batchSize} rows.
+   * @param reuseContainers   If set to {@code false}, every {@link 
Iterator#next()} call creates
+   *                          new instances of Arrow vectors.
+   *                          If set to {@code true}, the Arrow vectors in the 
previous
+   *                          {@link Iterator#next()} may be reused for the 
data returned
+   *                          in the current {@link Iterator#next()}.
+   *                          This option avoids allocating memory again and 
again.
+   *                          Irrespective of the value of {@code 
reuseContainers}, the Arrow vectors
+   *                          in the previous {@link Iterator#next()} call are 
closed before creating
+   *                          new instances if the current {@link 
Iterator#next()}.
+   */
+  VectorizedCombinedScanIterator(
+          CombinedScanTask task,
+          Schema expectedSchema,
+          String nameMapping,
+          FileIO io,
+          EncryptionManager encryptionManager,
+          boolean caseSensitive,
+          int batchSize,
+          boolean reuseContainers) {
+    this.tasks = task.files().iterator();
+    Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
+    task.files().stream()
+        .flatMap(fileScanTask -> Stream.concat(Stream.of(fileScanTask.file()), 
fileScanTask.deletes().stream()))
+        .forEach(file -> keyMetadata.put(file.path().toString(), 
file.keyMetadata()));
+    Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
+        .map(entry -> 
EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()), 
entry.getValue()));
+
+    // decrypt with the batch call to avoid multiple RPCs to a key server, if 
possible
+    Iterable<InputFile> decryptedFiles = 
encryptionManager.decrypt(encrypted::iterator);
+
+    Map<String, InputFile> files = 
Maps.newHashMapWithExpectedSize(task.files().size());
+    decryptedFiles.forEach(decrypted -> 
files.putIfAbsent(decrypted.location(), decrypted));
+    this.inputFiles = Collections.unmodifiableMap(files);
+
+    this.currentIterator = CloseableIterator.empty();
+    this.expectedSchema = expectedSchema;
+    this.nameMapping = nameMapping;
+    this.caseSensitive = caseSensitive;
+    this.batchSize = batchSize;
+    this.reuseContainers = reuseContainers;
+  }
+
+  @Override
+  public boolean hasNext() {

Review comment:
       What is the difference between this class and `ArrowReader` seems to be 
some duplication?

##########
File path: 
arrow/src/test/java/org/apache/iceberg/arrow/vectorized/ArrowReaderTest.java
##########
@@ -0,0 +1,734 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.OverwriteFiles;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.types.Types;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.iceberg.Files.localInput;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test cases for {@link ArrowReader}.
+ * <p>All tests create a table with monthly partitions and write 1 year of 
data to the table.
+ */
+public class ArrowReaderTest {
+
+  private static final int NUM_ROWS_PER_MONTH = 20;
+  private static final ImmutableList<String> ALL_COLUMNS =
+      ImmutableList.of(
+          "timestamp",
+          "timestamp_nullable",
+          "boolean",
+          "boolean_nullable",
+          "int",
+          "int_nullable",
+          "long",
+          "long_nullable",
+          "float",
+          "float_nullable",
+          "double",
+          "double_nullable",
+          "timestamp_tz",
+          "timestamp_tz_nullable",
+          "string",
+          "string_nullable",
+          "bytes",
+          "bytes_nullable",
+          "date",
+          "date_nullable",
+          "int_promotion"
+      );
+
+  @Rule
+  public final TemporaryFolder temp = new TemporaryFolder();
+
+  private HadoopTables tables;
+
+  private String tableLocation;
+  private List<GenericRecord> rowsWritten;
+
+  /**
+   * Read all rows and columns from the table without any filter. The test 
asserts that the Arrow {@link
+   * VectorSchemaRoot} contains the expected schema and expected vector types. 
Then the test asserts that the vectors
+   * contains expected values. The test also asserts the total number of rows 
match the expected value.
+   */
+  @Test
+  public void testReadAll() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    readAndCheckVectorSchemaRoots(table.newScan(), NUM_ROWS_PER_MONTH, 12 * 
NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * This test writes each partition with constant value rows. The Arrow 
vectors returned are mostly of type int32
+   * which is unexpected. This is happening because of dictionary encoding at 
the storage level.
+   * <p>
+   * Following are the expected and actual Arrow schema:
+   * <pre>
+   * Expected Arrow Schema:
+   * timestamp: Timestamp(MICROSECOND, null) not null,
+   * timestamp_nullable: Timestamp(MICROSECOND, null),
+   * boolean: Bool not null,
+   * boolean_nullable: Bool,
+   * int: Int(32, true) not null,
+   * int_nullable: Int(32, true),
+   * long: Int(64, true) not null,
+   * long_nullable: Int(64, true),
+   * float: FloatingPoint(SINGLE) not null,
+   * float_nullable: FloatingPoint(SINGLE),
+   * double: FloatingPoint(DOUBLE) not null,
+   * double_nullable: FloatingPoint(DOUBLE),
+   * timestamp_tz: Timestamp(MICROSECOND, UTC) not null,
+   * timestamp_tz_nullable: Timestamp(MICROSECOND, UTC),
+   * string: Utf8 not null,
+   * string_nullable: Utf8,
+   * bytes: Binary not null,
+   * bytes_nullable: Binary,
+   * date: Date(DAY) not null,
+   * date_nullable: Date(DAY),
+   * int_promotion: Int(32, true) not null
+   *
+   * Actual Arrow Schema:
+   * timestamp: Int(32, true) not null,
+   * timestamp_nullable: Int(32, true),
+   * boolean: Bool not null,
+   * boolean_nullable: Bool,
+   * int: Int(32, true) not null,
+   * int_nullable: Int(32, true),
+   * long: Int(32, true) not null,
+   * long_nullable: Int(32, true),
+   * float: Int(32, true) not null,
+   * float_nullable: Int(32, true),
+   * double: Int(32, true) not null,
+   * double_nullable: Int(32, true),
+   * timestamp_tz: Int(32, true) not null,
+   * timestamp_tz_nullable: Int(32, true),
+   * string: Int(32, true) not null,
+   * string_nullable: Int(32, true),
+   * bytes: Int(32, true) not null,
+   * bytes_nullable: Int(32, true),
+   * date: Date(DAY) not null,
+   * date_nullable: Date(DAY),
+   * int_promotion: Int(32, true) not null
+   * </pre>
+   * <p>
+   * TODO: fix the returned Arrow vectors to have vector types consistent with 
Iceberg types.
+   * <p>
+   * Read all rows and columns from the table without any filter. The test 
asserts that the Arrow {@link
+   * VectorSchemaRoot} contains the expected schema and expected vector types. 
Then the test asserts that the vectors
+   * contains expected values. The test also asserts the total number of rows 
match the expected value.
+   */
+  @Test
+  @Ignore
+  public void testReadAllWithConstantRecords() throws Exception {
+    writeTableWithConstantRecords();
+    Table table = tables.load(tableLocation);
+    readAndCheckVectorSchemaRoots(table.newScan(), NUM_ROWS_PER_MONTH, 12 * 
NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * Read all rows and columns from the table without any filter. The test 
uses a batch size smaller than the number of
+   * rows in a partition. The test asserts that the Arrow {@link 
VectorSchemaRoot} contains the expected schema and
+   * expected vector types. Then the test asserts that the vectors contains 
expected values. The test also asserts the
+   * total number of rows match the expected value.
+   */
+  @Test
+  public void testReadAllWithSmallerBatchSize() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan();
+    readAndCheckVectorSchemaRoots(scan, 10, 12 * NUM_ROWS_PER_MONTH, 
ALL_COLUMNS);
+  }
+
+  /**
+   * Read selected rows and all columns from the table using a time range row 
filter. The test asserts that the Arrow
+   * {@link VectorSchemaRoot} contains the expected schema and expected vector 
types. Then the test asserts that the
+   * vectors contains expected values. The test also asserts the total number 
of rows match the expected value.
+   */
+  @Test
+  public void testReadRangeFilter() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    LocalDateTime beginTime = LocalDateTime.of(2020, 1, 1, 0, 0, 0);
+    LocalDateTime endTime = LocalDateTime.of(2020, 2, 1, 0, 0, 0);
+    TableScan scan = table.newScan()
+        .filter(Expressions.and(
+            Expressions.greaterThanOrEqual("timestamp", 
timestampToMicros(beginTime)),
+            Expressions.lessThan("timestamp", timestampToMicros(endTime))));
+    readAndCheckVectorSchemaRoots(scan, NUM_ROWS_PER_MONTH, 
NUM_ROWS_PER_MONTH, ALL_COLUMNS);
+  }
+
+  /**
+   * Read selected rows and all columns from the table using a time range row 
filter.
+   * The test asserts that the result is empty.
+   */
+  @Test
+  public void testReadRangeFilterEmptyResult() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    LocalDateTime beginTime = LocalDateTime.of(2021, 1, 1, 0, 0, 0);
+    LocalDateTime endTime = LocalDateTime.of(2021, 2, 1, 0, 0, 0);
+    TableScan scan = table.newScan()
+            .filter(Expressions.and(
+                    Expressions.greaterThanOrEqual("timestamp", 
timestampToMicros(beginTime)),
+                    Expressions.lessThan("timestamp", 
timestampToMicros(endTime))));
+    int numRoots = 0;
+    for (ArrowBatch batch : new VectorizedTableScanIterable(scan, 
NUM_ROWS_PER_MONTH, false)) {
+      numRoots++;
+    }
+    assertEquals(0, numRoots);
+  }
+
+  /**
+   * Read all rows and selected columns from the table with a column selection 
filter. The test asserts that the Arrow
+   * {@link VectorSchemaRoot} contains the expected schema and expected vector 
types. Then the test asserts that the
+   * vectors contains expected values. The test also asserts the total number 
of rows match the expected value.
+   */
+  @Test
+  public void testReadColumnFilter1() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan()
+        .select("timestamp", "int", "string");
+    readAndCheckVectorSchemaRoots(
+        scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH,
+        ImmutableList.of("timestamp", "int", "string"));
+  }
+
+  /**
+   * Read all rows and a single column from the table with a column selection 
filter. The test asserts that the Arrow
+   * {@link VectorSchemaRoot} contains the expected schema and expected vector 
types. Then the test asserts that the
+   * vectors contains expected values. The test also asserts the total number 
of rows match the expected value.
+   */
+  @Test
+  public void testReadColumnFilter2() throws Exception {
+    writeTableWithIncrementalRecords();
+    Table table = tables.load(tableLocation);
+    TableScan scan = table.newScan()
+        .select("timestamp");
+    readAndCheckVectorSchemaRoots(
+        scan, NUM_ROWS_PER_MONTH, 12 * NUM_ROWS_PER_MONTH,
+        ImmutableList.of("timestamp"));
+  }
+
+  private void readAndCheckVectorSchemaRoots(
+      TableScan scan,
+      int numRowsPerRoot,
+      int expectedTotalRows,
+      List<String> columns) {
+    Set<String> columnSet = ImmutableSet.copyOf(columns);
+    int rowIndex = 0;
+    int totalRows = 0;
+    for (ArrowBatch batch : new VectorizedTableScanIterable(scan, 
numRowsPerRoot, false)) {
+      VectorSchemaRoot root = batch.createVectorSchemaRootFromVectors();

Review comment:
       As this is likely to be used as a reference for anyone looking to 
utilize the vectorized reader it would be good to include some tests of the 
`ArrowBatch` as well

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVector.java
##########
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.nio.charset.StandardCharsets;
+import java.util.stream.IntStream;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.holders.NullableVarCharHolder;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * This class is inspired by Spark's {@code ColumnVector}.
+ * This class represents the column data for an Iceberg table query.
+ * It wraps an arrow {@link FieldVector} and provides simple
+ * accessors for the row values. Advanced users can access
+ * the {@link FieldVector}.
+ * <p>
+ *   Supported Iceberg data types:
+ *   <ul>
+ *     <li>{@link Types.BooleanType}</li>
+ *     <li>{@link Types.IntegerType}</li>
+ *     <li>{@link Types.LongType}</li>
+ *     <li>{@link Types.FloatType}</li>
+ *     <li>{@link Types.DoubleType}</li>
+ *     <li>{@link Types.StringType}</li>
+ *     <li>{@link Types.BinaryType}</li>
+ *     <li>{@link Types.TimestampType} (with and without timezone)</li>
+ *     <li>{@link Types.DateType}</li>
+ *   </ul>
+ */
+public class ArrowVector implements AutoCloseable {

Review comment:
       There is a fair bit of duplication between here and 
`org.apache.iceberg.spark.data.vectorized`. I think we should be able to 
collapse these and the spark classes w/ eg a parameter or so. We can then 
remove the spark classes and reference these.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to