mayursrivastava commented on a change in pull request #2286:
URL: https://github.com/apache/iceberg/pull/2286#discussion_r682565585



##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ *     <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link 
MinorType#BIT}</li>
+ *     <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link 
MinorType#INT}</li>
+ *     <li>Iceberg: {@link Types.LongType}, Arrow: {@link 
MinorType#BIGINT}</li>
+ *     <li>Iceberg: {@link Types.FloatType}, Arrow: {@link 
MinorType#FLOAT4}</li>
+ *     <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link 
MinorType#FLOAT8}</li>
+ *     <li>Iceberg: {@link Types.StringType}, Arrow: {@link 
MinorType#VARCHAR}</li>
+ *     <li>Iceberg: {@link Types.TimestampType} (both with and without 
timezone),
+ *         Arrow: {@link MinorType#TIMEMICRO}</li>
+ *     <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link 
MinorType#VARBINARY}</li>
+ *     <li>Iceberg: {@link Types.DateType}, Arrow: {@link 
MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ *     <li>Type promotion: In case of type promotion, the Arrow vector 
corresponding to
+ *     the data type in the parquet file is returned instead of the data type 
in the latest schema.
+ *     See https://github.com/apache/iceberg/issues/2483.</li>
+ *     <li>Columns with constant values are physically encoded as a 
dictionary. The Arrow vector
+ *     type is int32 instead of the type as per the schema.
+ *     See https://github.com/apache/iceberg/issues/2484.</li>
+ *     <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link 
Types.MapType},
+ *     {@link Types.StructType}, {@link Types.UUIDType}, {@link 
Types.FixedType} and
+ *     {@link Types.DecimalType}
+ *     See https://github.com/apache/iceberg/issues/2485 and 
https://github.com/apache/iceberg/issues/2486.</li>
+ *     <li>Iceberg v2 spec is not supported.
+ *     See https://github.com/apache/iceberg/issues/2487.</li>

Review comment:
       The fixes are present in https://github.com/apache/iceberg/pull/2933.
   I've added checks similar to the Spark reader before allowing vectorized 
reader. 
   
   The arrow reader will check for the following conditions to be met:
   1. At least 1 column is queried.
   2. There are no delete files.
   3. Data types in columns are supported.
   
   I've also changed the reader to not process delete files because they don't 
work anyways with this reader. 
   

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ *     <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link 
MinorType#BIT}</li>
+ *     <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link 
MinorType#INT}</li>
+ *     <li>Iceberg: {@link Types.LongType}, Arrow: {@link 
MinorType#BIGINT}</li>
+ *     <li>Iceberg: {@link Types.FloatType}, Arrow: {@link 
MinorType#FLOAT4}</li>
+ *     <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link 
MinorType#FLOAT8}</li>
+ *     <li>Iceberg: {@link Types.StringType}, Arrow: {@link 
MinorType#VARCHAR}</li>
+ *     <li>Iceberg: {@link Types.TimestampType} (both with and without 
timezone),
+ *         Arrow: {@link MinorType#TIMEMICRO}</li>
+ *     <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link 
MinorType#VARBINARY}</li>
+ *     <li>Iceberg: {@link Types.DateType}, Arrow: {@link 
MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ *     <li>Type promotion: In case of type promotion, the Arrow vector 
corresponding to
+ *     the data type in the parquet file is returned instead of the data type 
in the latest schema.
+ *     See https://github.com/apache/iceberg/issues/2483.</li>
+ *     <li>Columns with constant values are physically encoded as a 
dictionary. The Arrow vector
+ *     type is int32 instead of the type as per the schema.
+ *     See https://github.com/apache/iceberg/issues/2484.</li>
+ *     <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link 
Types.MapType},
+ *     {@link Types.StructType}, {@link Types.UUIDType}, {@link 
Types.FixedType} and
+ *     {@link Types.DecimalType}

Review comment:
       yes, they will throw UnsupportedOperationException.
   I've added explicit exception in the reader code.
   The fixes are present in https://github.com/apache/iceberg/pull/2933.
   

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ *     <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link 
MinorType#BIT}</li>
+ *     <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link 
MinorType#INT}</li>
+ *     <li>Iceberg: {@link Types.LongType}, Arrow: {@link 
MinorType#BIGINT}</li>
+ *     <li>Iceberg: {@link Types.FloatType}, Arrow: {@link 
MinorType#FLOAT4}</li>
+ *     <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link 
MinorType#FLOAT8}</li>
+ *     <li>Iceberg: {@link Types.StringType}, Arrow: {@link 
MinorType#VARCHAR}</li>
+ *     <li>Iceberg: {@link Types.TimestampType} (both with and without 
timezone),
+ *         Arrow: {@link MinorType#TIMEMICRO}</li>
+ *     <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link 
MinorType#VARBINARY}</li>
+ *     <li>Iceberg: {@link Types.DateType}, Arrow: {@link 
MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ *     <li>Type promotion: In case of type promotion, the Arrow vector 
corresponding to
+ *     the data type in the parquet file is returned instead of the data type 
in the latest schema.
+ *     See https://github.com/apache/iceberg/issues/2483.</li>
+ *     <li>Columns with constant values are physically encoded as a 
dictionary. The Arrow vector
+ *     type is int32 instead of the type as per the schema.
+ *     See https://github.com/apache/iceberg/issues/2484.</li>
+ *     <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link 
Types.MapType},
+ *     {@link Types.StructType}, {@link Types.UUIDType}, {@link 
Types.FixedType} and
+ *     {@link Types.DecimalType}
+ *     See https://github.com/apache/iceberg/issues/2485 and 
https://github.com/apache/iceberg/issues/2486.</li>
+ *     <li>Iceberg v2 spec is not supported.
+ *     See https://github.com/apache/iceberg/issues/2487.</li>
+ * </ul>
+ */
+public class ArrowReader extends CloseableGroup {
+
+  private final Schema schema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final int batchSize;
+  private final boolean reuseContainers;
+
+  /**
+   * Create a new instance of the reader.
+   *
+   * @param scan the table scan object.
+   * @param batchSize the maximum number of rows per Arrow batch.
+   * @param reuseContainers whether to reuse Arrow vectors when iterating 
through the data.
+   *                        If set to {@code false}, every {@link 
Iterator#next()} call creates
+   *                        new instances of Arrow vectors.
+   *                        If set to {@code true}, the Arrow vectors in the 
previous
+   *                        {@link Iterator#next()} may be reused for the data 
returned
+   *                        in the current {@link Iterator#next()}.
+   *                        This option avoids allocating memory again and 
again.
+   *                        Irrespective of the value of {@code 
reuseContainers}, the Arrow vectors
+   *                        in the previous {@link Iterator#next()} call are 
closed before creating
+   *                        new instances if the current {@link 
Iterator#next()}.
+   */
+  public ArrowReader(TableScan scan, int batchSize, boolean reuseContainers) {
+    this.schema = scan.schema();
+    this.io = scan.table().io();
+    this.encryption = scan.table().encryption();
+    this.batchSize = batchSize;
+    // start planning tasks in the background
+    this.reuseContainers = reuseContainers;
+  }
+
+  /**
+   * Returns a new iterator of {@link ColumnarBatch} objects.
+   * <p>
+   * Note that the reader owns the {@link ColumnarBatch} objects and takes 
care of closing them.
+   * The caller should not hold onto a {@link ColumnarBatch} or try to close 
them.
+   *
+   * <p>If {@code reuseContainers} is {@code false}, the Arrow vectors in the
+   * previous {@link ColumnarBatch} are closed before returning the next 
{@link ColumnarBatch} object.
+   * This implies that the caller should either use the {@link ColumnarBatch} 
or transfer the ownership of
+   * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   *
+   * <p>If {@code reuseContainers} is {@code true}, the Arrow vectors in the
+   * previous {@link ColumnarBatch} may be reused for the next {@link 
ColumnarBatch}.
+   * This implies that the caller should either use the {@link ColumnarBatch} 
or deep copy the
+   * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   */
+  public CloseableIterator<ColumnarBatch> 
open(CloseableIterable<CombinedScanTask> tasks) {
+    CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
+        tasks,
+        schema,
+        null,
+        io,
+        encryption,
+        true,
+        batchSize,
+        reuseContainers
+    );
+    addCloseable(itr);
+    return itr;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close(); // close data files
+  }
+
+  /**
+   * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+   * Only Parquet data file format is supported.
+   */
+  private static final class VectorizedCombinedScanIterator implements 
CloseableIterator<ColumnarBatch> {
+
+    private final List<FileScanTask> fileTasks;
+    private final Iterator<FileScanTask> fileItr;
+    private final Map<String, InputFile> inputFiles;
+    private final Schema expectedSchema;
+    private final String nameMapping;
+    private final boolean caseSensitive;
+    private final int batchSize;
+    private final boolean reuseContainers;
+    private CloseableIterator<ColumnarBatch> currentIterator;
+    private ColumnarBatch current;
+    private FileScanTask currentTask;
+
+    /**
+     * Create a new instance.
+     *
+     * @param tasks             Combined file scan tasks.
+     * @param expectedSchema    Read schema. The returned data will have this 
schema.
+     * @param nameMapping       Mapping from external schema names to Iceberg 
type IDs.
+     * @param io                File I/O.
+     * @param encryptionManager Encryption manager.
+     * @param caseSensitive     If {@code true}, column names are case 
sensitive.
+     *                          If {@code false}, column names are not case 
sensitive.
+     * @param batchSize         Batch size in number of rows. Each Arrow batch 
contains
+     *                          a maximum of {@code batchSize} rows.
+     * @param reuseContainers   If set to {@code false}, every {@link 
Iterator#next()} call creates
+     *                          new instances of Arrow vectors.
+     *                          If set to {@code true}, the Arrow vectors in 
the previous
+     *                          {@link Iterator#next()} may be reused for the 
data returned
+     *                          in the current {@link Iterator#next()}.
+     *                          This option avoids allocating memory again and 
again.
+     *                          Irrespective of the value of {@code 
reuseContainers}, the Arrow vectors
+     *                          in the previous {@link Iterator#next()} call 
are closed before creating
+     *                          new instances if the current {@link 
Iterator#next()}.
+     */
+    VectorizedCombinedScanIterator(
+        CloseableIterable<CombinedScanTask> tasks,
+        Schema expectedSchema,
+        String nameMapping,
+        FileIO io,
+        EncryptionManager encryptionManager,
+        boolean caseSensitive,
+        int batchSize,
+        boolean reuseContainers) {

Review comment:
       Fixed in https://github.com/apache/iceberg/pull/2933

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ *     <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link 
MinorType#BIT}</li>
+ *     <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link 
MinorType#INT}</li>
+ *     <li>Iceberg: {@link Types.LongType}, Arrow: {@link 
MinorType#BIGINT}</li>
+ *     <li>Iceberg: {@link Types.FloatType}, Arrow: {@link 
MinorType#FLOAT4}</li>
+ *     <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link 
MinorType#FLOAT8}</li>
+ *     <li>Iceberg: {@link Types.StringType}, Arrow: {@link 
MinorType#VARCHAR}</li>
+ *     <li>Iceberg: {@link Types.TimestampType} (both with and without 
timezone),
+ *         Arrow: {@link MinorType#TIMEMICRO}</li>
+ *     <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link 
MinorType#VARBINARY}</li>
+ *     <li>Iceberg: {@link Types.DateType}, Arrow: {@link 
MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ *     <li>Type promotion: In case of type promotion, the Arrow vector 
corresponding to
+ *     the data type in the parquet file is returned instead of the data type 
in the latest schema.
+ *     See https://github.com/apache/iceberg/issues/2483.</li>
+ *     <li>Columns with constant values are physically encoded as a 
dictionary. The Arrow vector
+ *     type is int32 instead of the type as per the schema.
+ *     See https://github.com/apache/iceberg/issues/2484.</li>
+ *     <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link 
Types.MapType},
+ *     {@link Types.StructType}, {@link Types.UUIDType}, {@link 
Types.FixedType} and
+ *     {@link Types.DecimalType}
+ *     See https://github.com/apache/iceberg/issues/2485 and 
https://github.com/apache/iceberg/issues/2486.</li>
+ *     <li>Iceberg v2 spec is not supported.
+ *     See https://github.com/apache/iceberg/issues/2487.</li>
+ * </ul>
+ */
+public class ArrowReader extends CloseableGroup {
+
+  private final Schema schema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final int batchSize;
+  private final boolean reuseContainers;
+
+  /**
+   * Create a new instance of the reader.
+   *
+   * @param scan the table scan object.
+   * @param batchSize the maximum number of rows per Arrow batch.
+   * @param reuseContainers whether to reuse Arrow vectors when iterating 
through the data.
+   *                        If set to {@code false}, every {@link 
Iterator#next()} call creates
+   *                        new instances of Arrow vectors.
+   *                        If set to {@code true}, the Arrow vectors in the 
previous
+   *                        {@link Iterator#next()} may be reused for the data 
returned
+   *                        in the current {@link Iterator#next()}.
+   *                        This option avoids allocating memory again and 
again.
+   *                        Irrespective of the value of {@code 
reuseContainers}, the Arrow vectors
+   *                        in the previous {@link Iterator#next()} call are 
closed before creating
+   *                        new instances if the current {@link 
Iterator#next()}.
+   */
+  public ArrowReader(TableScan scan, int batchSize, boolean reuseContainers) {
+    this.schema = scan.schema();
+    this.io = scan.table().io();
+    this.encryption = scan.table().encryption();
+    this.batchSize = batchSize;
+    // start planning tasks in the background
+    this.reuseContainers = reuseContainers;
+  }
+
+  /**
+   * Returns a new iterator of {@link ColumnarBatch} objects.
+   * <p>
+   * Note that the reader owns the {@link ColumnarBatch} objects and takes 
care of closing them.
+   * The caller should not hold onto a {@link ColumnarBatch} or try to close 
them.
+   *
+   * <p>If {@code reuseContainers} is {@code false}, the Arrow vectors in the
+   * previous {@link ColumnarBatch} are closed before returning the next 
{@link ColumnarBatch} object.
+   * This implies that the caller should either use the {@link ColumnarBatch} 
or transfer the ownership of
+   * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   *
+   * <p>If {@code reuseContainers} is {@code true}, the Arrow vectors in the
+   * previous {@link ColumnarBatch} may be reused for the next {@link 
ColumnarBatch}.
+   * This implies that the caller should either use the {@link ColumnarBatch} 
or deep copy the
+   * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   */
+  public CloseableIterator<ColumnarBatch> 
open(CloseableIterable<CombinedScanTask> tasks) {
+    CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
+        tasks,
+        schema,
+        null,
+        io,
+        encryption,
+        true,
+        batchSize,
+        reuseContainers
+    );
+    addCloseable(itr);
+    return itr;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close(); // close data files
+  }
+
+  /**
+   * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+   * Only Parquet data file format is supported.
+   */
+  private static final class VectorizedCombinedScanIterator implements 
CloseableIterator<ColumnarBatch> {
+
+    private final List<FileScanTask> fileTasks;
+    private final Iterator<FileScanTask> fileItr;
+    private final Map<String, InputFile> inputFiles;
+    private final Schema expectedSchema;
+    private final String nameMapping;
+    private final boolean caseSensitive;
+    private final int batchSize;
+    private final boolean reuseContainers;
+    private CloseableIterator<ColumnarBatch> currentIterator;
+    private ColumnarBatch current;
+    private FileScanTask currentTask;
+
+    /**
+     * Create a new instance.
+     *
+     * @param tasks             Combined file scan tasks.
+     * @param expectedSchema    Read schema. The returned data will have this 
schema.
+     * @param nameMapping       Mapping from external schema names to Iceberg 
type IDs.
+     * @param io                File I/O.
+     * @param encryptionManager Encryption manager.
+     * @param caseSensitive     If {@code true}, column names are case 
sensitive.
+     *                          If {@code false}, column names are not case 
sensitive.
+     * @param batchSize         Batch size in number of rows. Each Arrow batch 
contains
+     *                          a maximum of {@code batchSize} rows.
+     * @param reuseContainers   If set to {@code false}, every {@link 
Iterator#next()} call creates
+     *                          new instances of Arrow vectors.
+     *                          If set to {@code true}, the Arrow vectors in 
the previous
+     *                          {@link Iterator#next()} may be reused for the 
data returned
+     *                          in the current {@link Iterator#next()}.
+     *                          This option avoids allocating memory again and 
again.
+     *                          Irrespective of the value of {@code 
reuseContainers}, the Arrow vectors
+     *                          in the previous {@link Iterator#next()} call 
are closed before creating
+     *                          new instances if the current {@link 
Iterator#next()}.
+     */
+    VectorizedCombinedScanIterator(
+        CloseableIterable<CombinedScanTask> tasks,
+        Schema expectedSchema,
+        String nameMapping,
+        FileIO io,
+        EncryptionManager encryptionManager,
+        boolean caseSensitive,
+        int batchSize,
+        boolean reuseContainers) {
+      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
+          .map(CombinedScanTask::files)
+          .flatMap(Collection::stream)
+          .collect(Collectors.toList());
+      this.fileItr = fileTasks.iterator();
+      this.inputFiles = Collections.unmodifiableMap(fileTasks.stream()
+          .flatMap(fileScanTask -> 
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+          .map(file -> 
EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()), 
file.keyMetadata()))
+          .map(encryptionManager::decrypt)
+          .collect(Collectors.toMap(InputFile::location, 
Function.identity())));
+      this.currentIterator = CloseableIterator.empty();
+      this.expectedSchema = expectedSchema;
+      this.nameMapping = nameMapping;
+      this.caseSensitive = caseSensitive;
+      this.batchSize = batchSize;
+      this.reuseContainers = reuseContainers;
+    }
+
+    @Override
+    public boolean hasNext() {
+      try {
+        while (true) {
+          if (currentIterator.hasNext()) {
+            this.current = currentIterator.next();
+            return true;
+          } else if (fileItr.hasNext()) {
+            this.currentIterator.close();
+            this.currentTask = fileItr.next();
+            this.currentIterator = open(currentTask);
+          } else {
+            this.currentIterator.close();
+            return false;
+          }
+        }
+      } catch (IOException | RuntimeException e) {
+        if (currentTask != null && !currentTask.isDataTask()) {
+          throw new RuntimeException(
+              "Error reading file: " + getInputFile(currentTask).location() +
+                  ". Reason: the current task is not a data task, i.e. it 
cannot read data rows. " +

Review comment:
       Falling back to the same logic as in the BaseDataReader.
   But, the Iterator doesn't support throwing checked exceptions, so I still 
have to wrap the exception as a RuntimeException.
   Fixes present in https://github.com/apache/iceberg/pull/2933.
   

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ *     <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link 
MinorType#BIT}</li>
+ *     <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link 
MinorType#INT}</li>
+ *     <li>Iceberg: {@link Types.LongType}, Arrow: {@link 
MinorType#BIGINT}</li>
+ *     <li>Iceberg: {@link Types.FloatType}, Arrow: {@link 
MinorType#FLOAT4}</li>
+ *     <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link 
MinorType#FLOAT8}</li>
+ *     <li>Iceberg: {@link Types.StringType}, Arrow: {@link 
MinorType#VARCHAR}</li>
+ *     <li>Iceberg: {@link Types.TimestampType} (both with and without 
timezone),
+ *         Arrow: {@link MinorType#TIMEMICRO}</li>
+ *     <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link 
MinorType#VARBINARY}</li>
+ *     <li>Iceberg: {@link Types.DateType}, Arrow: {@link 
MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ *     <li>Type promotion: In case of type promotion, the Arrow vector 
corresponding to
+ *     the data type in the parquet file is returned instead of the data type 
in the latest schema.
+ *     See https://github.com/apache/iceberg/issues/2483.</li>
+ *     <li>Columns with constant values are physically encoded as a 
dictionary. The Arrow vector
+ *     type is int32 instead of the type as per the schema.
+ *     See https://github.com/apache/iceberg/issues/2484.</li>
+ *     <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link 
Types.MapType},
+ *     {@link Types.StructType}, {@link Types.UUIDType}, {@link 
Types.FixedType} and
+ *     {@link Types.DecimalType}
+ *     See https://github.com/apache/iceberg/issues/2485 and 
https://github.com/apache/iceberg/issues/2486.</li>
+ *     <li>Iceberg v2 spec is not supported.
+ *     See https://github.com/apache/iceberg/issues/2487.</li>
+ * </ul>
+ */
+public class ArrowReader extends CloseableGroup {
+
+  private final Schema schema;
+  private final FileIO io;
+  private final EncryptionManager encryption;
+  private final int batchSize;
+  private final boolean reuseContainers;
+
+  /**
+   * Create a new instance of the reader.
+   *
+   * @param scan the table scan object.
+   * @param batchSize the maximum number of rows per Arrow batch.
+   * @param reuseContainers whether to reuse Arrow vectors when iterating 
through the data.
+   *                        If set to {@code false}, every {@link 
Iterator#next()} call creates
+   *                        new instances of Arrow vectors.
+   *                        If set to {@code true}, the Arrow vectors in the 
previous
+   *                        {@link Iterator#next()} may be reused for the data 
returned
+   *                        in the current {@link Iterator#next()}.
+   *                        This option avoids allocating memory again and 
again.
+   *                        Irrespective of the value of {@code 
reuseContainers}, the Arrow vectors
+   *                        in the previous {@link Iterator#next()} call are 
closed before creating
+   *                        new instances if the current {@link 
Iterator#next()}.
+   */
+  public ArrowReader(TableScan scan, int batchSize, boolean reuseContainers) {
+    this.schema = scan.schema();
+    this.io = scan.table().io();
+    this.encryption = scan.table().encryption();
+    this.batchSize = batchSize;
+    // start planning tasks in the background
+    this.reuseContainers = reuseContainers;
+  }
+
+  /**
+   * Returns a new iterator of {@link ColumnarBatch} objects.
+   * <p>
+   * Note that the reader owns the {@link ColumnarBatch} objects and takes 
care of closing them.
+   * The caller should not hold onto a {@link ColumnarBatch} or try to close 
them.
+   *
+   * <p>If {@code reuseContainers} is {@code false}, the Arrow vectors in the
+   * previous {@link ColumnarBatch} are closed before returning the next 
{@link ColumnarBatch} object.
+   * This implies that the caller should either use the {@link ColumnarBatch} 
or transfer the ownership of
+   * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   *
+   * <p>If {@code reuseContainers} is {@code true}, the Arrow vectors in the
+   * previous {@link ColumnarBatch} may be reused for the next {@link 
ColumnarBatch}.
+   * This implies that the caller should either use the {@link ColumnarBatch} 
or deep copy the
+   * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   */
+  public CloseableIterator<ColumnarBatch> 
open(CloseableIterable<CombinedScanTask> tasks) {
+    CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
+        tasks,
+        schema,
+        null,
+        io,
+        encryption,
+        true,
+        batchSize,
+        reuseContainers
+    );
+    addCloseable(itr);
+    return itr;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close(); // close data files
+  }
+
+  /**
+   * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+   * Only Parquet data file format is supported.
+   */
+  private static final class VectorizedCombinedScanIterator implements 
CloseableIterator<ColumnarBatch> {
+
+    private final List<FileScanTask> fileTasks;
+    private final Iterator<FileScanTask> fileItr;
+    private final Map<String, InputFile> inputFiles;
+    private final Schema expectedSchema;
+    private final String nameMapping;
+    private final boolean caseSensitive;
+    private final int batchSize;
+    private final boolean reuseContainers;
+    private CloseableIterator<ColumnarBatch> currentIterator;
+    private ColumnarBatch current;
+    private FileScanTask currentTask;
+
+    /**
+     * Create a new instance.
+     *
+     * @param tasks             Combined file scan tasks.
+     * @param expectedSchema    Read schema. The returned data will have this 
schema.
+     * @param nameMapping       Mapping from external schema names to Iceberg 
type IDs.
+     * @param io                File I/O.
+     * @param encryptionManager Encryption manager.
+     * @param caseSensitive     If {@code true}, column names are case 
sensitive.
+     *                          If {@code false}, column names are not case 
sensitive.
+     * @param batchSize         Batch size in number of rows. Each Arrow batch 
contains
+     *                          a maximum of {@code batchSize} rows.
+     * @param reuseContainers   If set to {@code false}, every {@link 
Iterator#next()} call creates
+     *                          new instances of Arrow vectors.
+     *                          If set to {@code true}, the Arrow vectors in 
the previous
+     *                          {@link Iterator#next()} may be reused for the 
data returned
+     *                          in the current {@link Iterator#next()}.
+     *                          This option avoids allocating memory again and 
again.
+     *                          Irrespective of the value of {@code 
reuseContainers}, the Arrow vectors
+     *                          in the previous {@link Iterator#next()} call 
are closed before creating
+     *                          new instances if the current {@link 
Iterator#next()}.
+     */
+    VectorizedCombinedScanIterator(
+        CloseableIterable<CombinedScanTask> tasks,
+        Schema expectedSchema,
+        String nameMapping,
+        FileIO io,
+        EncryptionManager encryptionManager,
+        boolean caseSensitive,
+        int batchSize,
+        boolean reuseContainers) {
+      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
+          .map(CombinedScanTask::files)
+          .flatMap(Collection::stream)
+          .collect(Collectors.toList());
+      this.fileItr = fileTasks.iterator();
+      this.inputFiles = Collections.unmodifiableMap(fileTasks.stream()
+          .flatMap(fileScanTask -> 
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+          .map(file -> 
EncryptedFiles.encryptedInput(io.newInputFile(file.path().toString()), 
file.keyMetadata()))
+          .map(encryptionManager::decrypt)

Review comment:
       This was already fixed in #2720

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ColumnVector.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.iceberg.types.Types;
+
+/**
+ * This class is inspired by Spark's {@code ColumnVector}.
+ * This class represents the column data for an Iceberg table query.
+ * It wraps an arrow {@link FieldVector} and provides simple
+ * accessors for the row values. Advanced users can access
+ * the {@link FieldVector}.
+ * <p>
+ *   Supported Iceberg data types:
+ *   <ul>
+ *     <li>{@link Types.BooleanType}</li>
+ *     <li>{@link Types.IntegerType}</li>
+ *     <li>{@link Types.LongType}</li>
+ *     <li>{@link Types.FloatType}</li>
+ *     <li>{@link Types.DoubleType}</li>
+ *     <li>{@link Types.StringType}</li>
+ *     <li>{@link Types.BinaryType}</li>
+ *     <li>{@link Types.TimestampType} (with and without timezone)</li>
+ *     <li>{@link Types.DateType}</li>
+ *   </ul>
+ */
+public class ColumnVector implements AutoCloseable {

Review comment:
       My use case is to use Arrow VectorSchemaRoot directly, but I agree with 
@rymurr's suggestions to wrap the arrow data structures. 
   
   I see the following benefits with the wrapper interface:
   1. Lifecycle management is better.
   2. The current parquet reader returns physical repr of the data as arrow 
vectors. This means that the dictionary encoded columns are returned as int32 
and columns that were widened return the physical file column width (e.g. if 
int32 was widened to int64 and the data contains int32, the arrow vector is 
int32). The wrapper classes can handle dictionary encoding and type widening 
correctly. Note that this is also done in the Spark version. This, however, is 
a limitation of the current implementation and it will be better for the Arrow 
Reader to return arrow vectors with logical types.
   3. The wrapper interface is easier to use for most users.
   
   The cons are following:
   1. A new API is introduced that has to be learned and maintained.
   
   I also don't feel strong about the wrapper interface and if the community 
doesn't find it useful abstraction, I agree.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to