rymurr commented on a change in pull request #2286:
URL: https://github.com/apache/iceberg/pull/2286#discussion_r617404005
##########
File path: build.gradle
##########
@@ -735,9 +735,12 @@ project(':iceberg-arrow') {
exclude group: 'com.google.code.findbugs', module: 'jsr305'
}
compile("org.apache.arrow:arrow-memory-netty") {
- exclude group: 'io.netty', module: 'netty-common'
exclude group: 'com.google.code.findbugs', module: 'jsr305'
Review comment:
how come the netty module was removed. Presumably this will have
knock-on effects w/ shaded jars?
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ * <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link
MinorType#BIT}</li>
+ * <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link
MinorType#INT}</li>
+ * <li>Iceberg: {@link Types.LongType}, Arrow: {@link
MinorType#BIGINT}</li>
+ * <li>Iceberg: {@link Types.FloatType}, Arrow: {@link
MinorType#FLOAT4}</li>
+ * <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link
MinorType#FLOAT8}</li>
+ * <li>Iceberg: {@link Types.StringType}, Arrow: {@link
MinorType#VARCHAR}</li>
+ * <li>Iceberg: {@link Types.TimestampType} (both with and without
timezone),
+ * Arrow: {@link MinorType#TIMEMICRO}</li>
+ * <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link
MinorType#VARBINARY}</li>
+ * <li>Iceberg: {@link Types.DateType}, Arrow: {@link
MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ * <li>Type promotion: In case of type promotion, the Arrow vector
corresponding to
+ * the data type in the parquet file is returned instead of the data type
in the latest schema.
+ * See https://github.com/apache/iceberg/issues/2483.</li>
+ * <li>Columns with constant values are physically encoded as a
dictionary. The Arrow vector
+ * type is int32 instead of the type as per the schema.
+ * See https://github.com/apache/iceberg/issues/2484.</li>
+ * <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link
Types.MapType},
+ * {@link Types.StructType}, {@link Types.UUIDType}, {@link
Types.FixedType} and
+ * {@link Types.DecimalType}
+ * See https://github.com/apache/iceberg/issues/2485 and
https://github.com/apache/iceberg/issues/2486.</li>
+ * <li>Iceberg v2 spec is not supported.
+ * See https://github.com/apache/iceberg/issues/2487.</li>
+ * </ul>
+ */
+public class ArrowReader extends CloseableGroup {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+ private final Schema schema;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+ private final int batchSize;
+ private final boolean reuseContainers;
+
+ /**
+ * Create a new instance of the reader.
+ *
+ * @param scan the table scan object.
+ * @param batchSize the maximum number of rows per Arrow batch.
+ * @param reuseContainers whether to reuse Arrow vectors when iterating
through the data.
+ * If set to {@code false}, every {@link
Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in the
previous
+ * {@link Iterator#next()} may be reused for the data
returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and
again.
+ * Irrespective of the value of {@code
reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call are
closed before creating
+ * new instances if the current {@link
Iterator#next()}.
+ */
+ public ArrowReader(TableScan scan, int batchSize, boolean reuseContainers) {
+ this.schema = scan.schema();
+ this.io = scan.table().io();
+ this.encryption = scan.table().encryption();
+ this.batchSize = batchSize;
+ // start planning tasks in the background
+ this.reuseContainers = reuseContainers;
+ }
+
+ /**
+ * Returns a new iterator of {@link ColumnarBatch} objects.
+ * <p>
+ * Note that the reader owns the {@link ColumnarBatch} objects and takes
care of closing them.
+ * The caller should not hold onto a {@link ColumnarBatch} or try to close
them.
+ *
+ * <p>If {@code reuseContainers} is {@code false}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} are closed before returning the next
{@link ColumnarBatch} object.
+ * This implies that the caller should either use the {@link ColumnarBatch}
or transfer the ownership of
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ *
+ * <p>If {@code reuseContainers} is {@code true}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} may be reused for the next {@link
ColumnarBatch}.
+ * This implies that the caller should either use the {@link ColumnarBatch}
or deep copy the
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ */
+ public CloseableIterator<ColumnarBatch>
open(CloseableIterable<CombinedScanTask> tasks) {
+ CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
+ tasks,
+ schema,
+ null,
+ io,
+ encryption,
+ true,
+ batchSize,
+ reuseContainers
+ );
+ addCloseable(itr);
+ return itr;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close(); // close data files
+ }
+
+ /**
+ * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+ * Only Parquet data file format is supported.
+ */
+ private static final class VectorizedCombinedScanIterator implements
CloseableIterator<ColumnarBatch> {
+
+ private final Iterator<FileScanTask> fileItr;
+ private final Map<String, InputFile> inputFiles;
+ private final Schema expectedSchema;
+ private final String nameMapping;
+ private final boolean caseSensitive;
+ private final int batchSize;
+ private final boolean reuseContainers;
+ private CloseableIterator<ColumnarBatch> currentIterator;
+ private ColumnarBatch current;
+ private FileScanTask currentTask;
+
+ /**
+ * Create a new instance.
+ *
+ * @param tasks Combined file scan tasks.
+ * @param expectedSchema Read schema. The returned data will have this
schema.
+ * @param nameMapping Mapping from external schema names to Iceberg
type IDs.
+ * @param io File I/O.
+ * @param encryptionManager Encryption manager.
+ * @param caseSensitive If {@code true}, column names are case
sensitive.
+ * If {@code false}, column names are not case
sensitive.
+ * @param batchSize Batch size in number of rows. Each Arrow batch
contains
+ * a maximum of {@code batchSize} rows.
+ * @param reuseContainers If set to {@code false}, every {@link
Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in
the previous
+ * {@link Iterator#next()} may be reused for the
data returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and
again.
+ * Irrespective of the value of {@code
reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call
are closed before creating
+ * new instances if the current {@link
Iterator#next()}.
+ */
+ VectorizedCombinedScanIterator(
+ CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
+ List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(),
false)
+ .map(CombinedScanTask::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ this.fileItr = fileTasks.iterator();
+ Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
+ fileTasks.stream()
+ .flatMap(fileScanTask ->
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+ .forEach(file -> keyMetadata.put(file.path().toString(),
file.keyMetadata()));
Review comment:
rather than allocating this to a `Map` can this be directly transformed
into a Stream?
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ * <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link
MinorType#BIT}</li>
+ * <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link
MinorType#INT}</li>
+ * <li>Iceberg: {@link Types.LongType}, Arrow: {@link
MinorType#BIGINT}</li>
+ * <li>Iceberg: {@link Types.FloatType}, Arrow: {@link
MinorType#FLOAT4}</li>
+ * <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link
MinorType#FLOAT8}</li>
+ * <li>Iceberg: {@link Types.StringType}, Arrow: {@link
MinorType#VARCHAR}</li>
+ * <li>Iceberg: {@link Types.TimestampType} (both with and without
timezone),
+ * Arrow: {@link MinorType#TIMEMICRO}</li>
+ * <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link
MinorType#VARBINARY}</li>
+ * <li>Iceberg: {@link Types.DateType}, Arrow: {@link
MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ * <li>Type promotion: In case of type promotion, the Arrow vector
corresponding to
+ * the data type in the parquet file is returned instead of the data type
in the latest schema.
+ * See https://github.com/apache/iceberg/issues/2483.</li>
+ * <li>Columns with constant values are physically encoded as a
dictionary. The Arrow vector
+ * type is int32 instead of the type as per the schema.
+ * See https://github.com/apache/iceberg/issues/2484.</li>
+ * <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link
Types.MapType},
+ * {@link Types.StructType}, {@link Types.UUIDType}, {@link
Types.FixedType} and
+ * {@link Types.DecimalType}
+ * See https://github.com/apache/iceberg/issues/2485 and
https://github.com/apache/iceberg/issues/2486.</li>
+ * <li>Iceberg v2 spec is not supported.
+ * See https://github.com/apache/iceberg/issues/2487.</li>
+ * </ul>
+ */
+public class ArrowReader extends CloseableGroup {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+ private final Schema schema;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+ private final int batchSize;
+ private final boolean reuseContainers;
+
+ /**
+ * Create a new instance of the reader.
+ *
+ * @param scan the table scan object.
+ * @param batchSize the maximum number of rows per Arrow batch.
+ * @param reuseContainers whether to reuse Arrow vectors when iterating
through the data.
+ * If set to {@code false}, every {@link
Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in the
previous
+ * {@link Iterator#next()} may be reused for the data
returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and
again.
+ * Irrespective of the value of {@code
reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call are
closed before creating
+ * new instances if the current {@link
Iterator#next()}.
+ */
+ public ArrowReader(TableScan scan, int batchSize, boolean reuseContainers) {
+ this.schema = scan.schema();
+ this.io = scan.table().io();
+ this.encryption = scan.table().encryption();
+ this.batchSize = batchSize;
+ // start planning tasks in the background
+ this.reuseContainers = reuseContainers;
+ }
+
+ /**
+ * Returns a new iterator of {@link ColumnarBatch} objects.
+ * <p>
+ * Note that the reader owns the {@link ColumnarBatch} objects and takes
care of closing them.
+ * The caller should not hold onto a {@link ColumnarBatch} or try to close
them.
+ *
+ * <p>If {@code reuseContainers} is {@code false}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} are closed before returning the next
{@link ColumnarBatch} object.
+ * This implies that the caller should either use the {@link ColumnarBatch}
or transfer the ownership of
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ *
+ * <p>If {@code reuseContainers} is {@code true}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} may be reused for the next {@link
ColumnarBatch}.
+ * This implies that the caller should either use the {@link ColumnarBatch}
or deep copy the
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ */
+ public CloseableIterator<ColumnarBatch>
open(CloseableIterable<CombinedScanTask> tasks) {
+ CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
+ tasks,
+ schema,
+ null,
+ io,
+ encryption,
+ true,
+ batchSize,
+ reuseContainers
+ );
+ addCloseable(itr);
+ return itr;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close(); // close data files
+ }
+
+ /**
+ * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+ * Only Parquet data file format is supported.
+ */
+ private static final class VectorizedCombinedScanIterator implements
CloseableIterator<ColumnarBatch> {
+
+ private final Iterator<FileScanTask> fileItr;
+ private final Map<String, InputFile> inputFiles;
+ private final Schema expectedSchema;
+ private final String nameMapping;
+ private final boolean caseSensitive;
+ private final int batchSize;
+ private final boolean reuseContainers;
+ private CloseableIterator<ColumnarBatch> currentIterator;
+ private ColumnarBatch current;
+ private FileScanTask currentTask;
+
+ /**
+ * Create a new instance.
+ *
+ * @param tasks Combined file scan tasks.
+ * @param expectedSchema Read schema. The returned data will have this
schema.
+ * @param nameMapping Mapping from external schema names to Iceberg
type IDs.
+ * @param io File I/O.
+ * @param encryptionManager Encryption manager.
+ * @param caseSensitive If {@code true}, column names are case
sensitive.
+ * If {@code false}, column names are not case
sensitive.
+ * @param batchSize Batch size in number of rows. Each Arrow batch
contains
+ * a maximum of {@code batchSize} rows.
+ * @param reuseContainers If set to {@code false}, every {@link
Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in
the previous
+ * {@link Iterator#next()} may be reused for the
data returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and
again.
+ * Irrespective of the value of {@code
reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call
are closed before creating
+ * new instances if the current {@link
Iterator#next()}.
+ */
+ VectorizedCombinedScanIterator(
+ CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
+ List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(),
false)
+ .map(CombinedScanTask::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ this.fileItr = fileTasks.iterator();
+ Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
+ fileTasks.stream()
+ .flatMap(fileScanTask ->
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+ .forEach(file -> keyMetadata.put(file.path().toString(),
file.keyMetadata()));
+ Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
+ .map(entry ->
EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()),
entry.getValue()));
+
+ // decrypt with the batch call to avoid multiple RPCs to a key server,
if possible
+ Iterable<InputFile> decryptedFiles =
encryptionManager.decrypt(encrypted::iterator);
+
+ Map<String, InputFile> files =
Maps.newHashMapWithExpectedSize(fileTasks.size());
Review comment:
similarily this looks like it could just use a `Collector.toMap`
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java
##########
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.parquet.VectorizedReader;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnPath;
+
+/**
+ * A collection of vectorized readers per column (in the expected read schema)
and Arrow Vector holders. This class owns
+ * the Arrow vectors and is responsible for closing the Arrow vectors.
+ */
+class ArrowBatchReader implements VectorizedReader<ColumnarBatch> {
+
+ private final VectorizedArrowReader[] readers;
+ private final VectorHolder[] vectorHolders;
+
+ ArrowBatchReader(List<VectorizedReader<?>> readers) {
+ this.readers = readers.stream()
+ .map(VectorizedArrowReader.class::cast)
+ .toArray(VectorizedArrowReader[]::new);
+ this.vectorHolders = new VectorHolder[readers.size()];
+ }
+
+ @Override
+ public final void setRowGroupInfo(
+ PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData,
long rowPosition) {
+ for (VectorizedArrowReader reader : readers) {
Review comment:
this implies the reader could be `null`? How/why would that happen?
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ * <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link
MinorType#BIT}</li>
+ * <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link
MinorType#INT}</li>
+ * <li>Iceberg: {@link Types.LongType}, Arrow: {@link
MinorType#BIGINT}</li>
+ * <li>Iceberg: {@link Types.FloatType}, Arrow: {@link
MinorType#FLOAT4}</li>
+ * <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link
MinorType#FLOAT8}</li>
+ * <li>Iceberg: {@link Types.StringType}, Arrow: {@link
MinorType#VARCHAR}</li>
+ * <li>Iceberg: {@link Types.TimestampType} (both with and without
timezone),
+ * Arrow: {@link MinorType#TIMEMICRO}</li>
+ * <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link
MinorType#VARBINARY}</li>
+ * <li>Iceberg: {@link Types.DateType}, Arrow: {@link
MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ * <li>Type promotion: In case of type promotion, the Arrow vector
corresponding to
+ * the data type in the parquet file is returned instead of the data type
in the latest schema.
+ * See https://github.com/apache/iceberg/issues/2483.</li>
+ * <li>Columns with constant values are physically encoded as a
dictionary. The Arrow vector
+ * type is int32 instead of the type as per the schema.
+ * See https://github.com/apache/iceberg/issues/2484.</li>
+ * <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link
Types.MapType},
+ * {@link Types.StructType}, {@link Types.UUIDType}, {@link
Types.FixedType} and
+ * {@link Types.DecimalType}
+ * See https://github.com/apache/iceberg/issues/2485 and
https://github.com/apache/iceberg/issues/2486.</li>
+ * <li>Iceberg v2 spec is not supported.
+ * See https://github.com/apache/iceberg/issues/2487.</li>
+ * </ul>
+ */
+public class ArrowReader extends CloseableGroup {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+ private final Schema schema;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+ private final int batchSize;
+ private final boolean reuseContainers;
+
+ /**
+ * Create a new instance of the reader.
+ *
+ * @param scan the table scan object.
+ * @param batchSize the maximum number of rows per Arrow batch.
+ * @param reuseContainers whether to reuse Arrow vectors when iterating
through the data.
+ * If set to {@code false}, every {@link
Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in the
previous
+ * {@link Iterator#next()} may be reused for the data
returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and
again.
+ * Irrespective of the value of {@code
reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call are
closed before creating
+ * new instances if the current {@link
Iterator#next()}.
+ */
+ public ArrowReader(TableScan scan, int batchSize, boolean reuseContainers) {
+ this.schema = scan.schema();
+ this.io = scan.table().io();
+ this.encryption = scan.table().encryption();
+ this.batchSize = batchSize;
+ // start planning tasks in the background
+ this.reuseContainers = reuseContainers;
+ }
+
+ /**
+ * Returns a new iterator of {@link ColumnarBatch} objects.
+ * <p>
+ * Note that the reader owns the {@link ColumnarBatch} objects and takes
care of closing them.
+ * The caller should not hold onto a {@link ColumnarBatch} or try to close
them.
+ *
+ * <p>If {@code reuseContainers} is {@code false}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} are closed before returning the next
{@link ColumnarBatch} object.
+ * This implies that the caller should either use the {@link ColumnarBatch}
or transfer the ownership of
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ *
+ * <p>If {@code reuseContainers} is {@code true}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} may be reused for the next {@link
ColumnarBatch}.
+ * This implies that the caller should either use the {@link ColumnarBatch}
or deep copy the
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ */
+ public CloseableIterator<ColumnarBatch>
open(CloseableIterable<CombinedScanTask> tasks) {
+ CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
+ tasks,
+ schema,
+ null,
+ io,
+ encryption,
+ true,
+ batchSize,
+ reuseContainers
+ );
+ addCloseable(itr);
+ return itr;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close(); // close data files
+ }
+
+ /**
+ * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+ * Only Parquet data file format is supported.
+ */
+ private static final class VectorizedCombinedScanIterator implements
CloseableIterator<ColumnarBatch> {
+
+ private final Iterator<FileScanTask> fileItr;
+ private final Map<String, InputFile> inputFiles;
+ private final Schema expectedSchema;
+ private final String nameMapping;
+ private final boolean caseSensitive;
+ private final int batchSize;
+ private final boolean reuseContainers;
+ private CloseableIterator<ColumnarBatch> currentIterator;
+ private ColumnarBatch current;
+ private FileScanTask currentTask;
+
+ /**
+ * Create a new instance.
+ *
+ * @param tasks Combined file scan tasks.
+ * @param expectedSchema Read schema. The returned data will have this
schema.
+ * @param nameMapping Mapping from external schema names to Iceberg
type IDs.
+ * @param io File I/O.
+ * @param encryptionManager Encryption manager.
+ * @param caseSensitive If {@code true}, column names are case
sensitive.
+ * If {@code false}, column names are not case
sensitive.
+ * @param batchSize Batch size in number of rows. Each Arrow batch
contains
+ * a maximum of {@code batchSize} rows.
+ * @param reuseContainers If set to {@code false}, every {@link
Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in
the previous
+ * {@link Iterator#next()} may be reused for the
data returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and
again.
+ * Irrespective of the value of {@code
reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call
are closed before creating
+ * new instances if the current {@link
Iterator#next()}.
+ */
+ VectorizedCombinedScanIterator(
+ CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
+ List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(),
false)
+ .map(CombinedScanTask::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ this.fileItr = fileTasks.iterator();
+ Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
+ fileTasks.stream()
+ .flatMap(fileScanTask ->
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+ .forEach(file -> keyMetadata.put(file.path().toString(),
file.keyMetadata()));
+ Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
+ .map(entry ->
EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()),
entry.getValue()));
+
+ // decrypt with the batch call to avoid multiple RPCs to a key server,
if possible
+ Iterable<InputFile> decryptedFiles =
encryptionManager.decrypt(encrypted::iterator);
+
+ Map<String, InputFile> files =
Maps.newHashMapWithExpectedSize(fileTasks.size());
+ decryptedFiles.forEach(decrypted ->
files.putIfAbsent(decrypted.location(), decrypted));
+ this.inputFiles = Collections.unmodifiableMap(files);
+
+ this.currentIterator = CloseableIterator.empty();
+ this.expectedSchema = expectedSchema;
+ this.nameMapping = nameMapping;
+ this.caseSensitive = caseSensitive;
+ this.batchSize = batchSize;
+ this.reuseContainers = reuseContainers;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ while (true) {
+ if (currentIterator.hasNext()) {
+ this.current = currentIterator.next();
+ return true;
+ } else if (fileItr.hasNext()) {
+ this.currentIterator.close();
+ this.currentTask = fileItr.next();
+ this.currentIterator = open(currentTask);
+ } else {
+ this.currentIterator.close();
+ return false;
+ }
+ }
+ } catch (IOException | RuntimeException e) {
+ if (currentTask != null && !currentTask.isDataTask()) {
+ LOG.error("Error reading file: {}",
getInputFile(currentTask).location(), e);
Review comment:
Can the error message be of the form "Cannot do x. reason. fix if exists
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+import org.apache.arrow.vector.NullCheckingForGet;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types.MinorType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.parquet.TypeWithSchemaVisitor;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.parquet.schema.MessageType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Vectorized reader that returns an iterator of {@link ColumnarBatch}.
+ * See {@link #open(CloseableIterable)} ()} to learn about the
+ * behavior of the iterator.
+ *
+ * <p>The following Iceberg data types are supported and have been tested:
+ * <ul>
+ * <li>Iceberg: {@link Types.BooleanType}, Arrow: {@link
MinorType#BIT}</li>
+ * <li>Iceberg: {@link Types.IntegerType}, Arrow: {@link
MinorType#INT}</li>
+ * <li>Iceberg: {@link Types.LongType}, Arrow: {@link
MinorType#BIGINT}</li>
+ * <li>Iceberg: {@link Types.FloatType}, Arrow: {@link
MinorType#FLOAT4}</li>
+ * <li>Iceberg: {@link Types.DoubleType}, Arrow: {@link
MinorType#FLOAT8}</li>
+ * <li>Iceberg: {@link Types.StringType}, Arrow: {@link
MinorType#VARCHAR}</li>
+ * <li>Iceberg: {@link Types.TimestampType} (both with and without
timezone),
+ * Arrow: {@link MinorType#TIMEMICRO}</li>
+ * <li>Iceberg: {@link Types.BinaryType}, Arrow: {@link
MinorType#VARBINARY}</li>
+ * <li>Iceberg: {@link Types.DateType}, Arrow: {@link
MinorType#DATEDAY}</li>
+ * </ul>
+ *
+ * <p>Features that don't work in this implementation:
+ * <ul>
+ * <li>Type promotion: In case of type promotion, the Arrow vector
corresponding to
+ * the data type in the parquet file is returned instead of the data type
in the latest schema.
+ * See https://github.com/apache/iceberg/issues/2483.</li>
+ * <li>Columns with constant values are physically encoded as a
dictionary. The Arrow vector
+ * type is int32 instead of the type as per the schema.
+ * See https://github.com/apache/iceberg/issues/2484.</li>
+ * <li>Data types: {@link Types.TimeType}, {@link Types.ListType}, {@link
Types.MapType},
+ * {@link Types.StructType}, {@link Types.UUIDType}, {@link
Types.FixedType} and
+ * {@link Types.DecimalType}
+ * See https://github.com/apache/iceberg/issues/2485 and
https://github.com/apache/iceberg/issues/2486.</li>
+ * <li>Iceberg v2 spec is not supported.
+ * See https://github.com/apache/iceberg/issues/2487.</li>
+ * </ul>
+ */
+public class ArrowReader extends CloseableGroup {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+ private final Schema schema;
+ private final FileIO io;
+ private final EncryptionManager encryption;
+ private final int batchSize;
+ private final boolean reuseContainers;
+
+ /**
+ * Create a new instance of the reader.
+ *
+ * @param scan the table scan object.
+ * @param batchSize the maximum number of rows per Arrow batch.
+ * @param reuseContainers whether to reuse Arrow vectors when iterating
through the data.
+ * If set to {@code false}, every {@link
Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in the
previous
+ * {@link Iterator#next()} may be reused for the data
returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and
again.
+ * Irrespective of the value of {@code
reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call are
closed before creating
+ * new instances if the current {@link
Iterator#next()}.
+ */
+ public ArrowReader(TableScan scan, int batchSize, boolean reuseContainers) {
+ this.schema = scan.schema();
+ this.io = scan.table().io();
+ this.encryption = scan.table().encryption();
+ this.batchSize = batchSize;
+ // start planning tasks in the background
+ this.reuseContainers = reuseContainers;
+ }
+
+ /**
+ * Returns a new iterator of {@link ColumnarBatch} objects.
+ * <p>
+ * Note that the reader owns the {@link ColumnarBatch} objects and takes
care of closing them.
+ * The caller should not hold onto a {@link ColumnarBatch} or try to close
them.
+ *
+ * <p>If {@code reuseContainers} is {@code false}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} are closed before returning the next
{@link ColumnarBatch} object.
+ * This implies that the caller should either use the {@link ColumnarBatch}
or transfer the ownership of
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ *
+ * <p>If {@code reuseContainers} is {@code true}, the Arrow vectors in the
+ * previous {@link ColumnarBatch} may be reused for the next {@link
ColumnarBatch}.
+ * This implies that the caller should either use the {@link ColumnarBatch}
or deep copy the
+ * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ */
+ public CloseableIterator<ColumnarBatch>
open(CloseableIterable<CombinedScanTask> tasks) {
+ CloseableIterator<ColumnarBatch> itr = new VectorizedCombinedScanIterator(
+ tasks,
+ schema,
+ null,
+ io,
+ encryption,
+ true,
+ batchSize,
+ reuseContainers
+ );
+ addCloseable(itr);
+ return itr;
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close(); // close data files
+ }
+
+ /**
+ * Reads the data file and returns an iterator of {@link VectorSchemaRoot}.
+ * Only Parquet data file format is supported.
+ */
+ private static final class VectorizedCombinedScanIterator implements
CloseableIterator<ColumnarBatch> {
+
+ private final Iterator<FileScanTask> fileItr;
+ private final Map<String, InputFile> inputFiles;
+ private final Schema expectedSchema;
+ private final String nameMapping;
+ private final boolean caseSensitive;
+ private final int batchSize;
+ private final boolean reuseContainers;
+ private CloseableIterator<ColumnarBatch> currentIterator;
+ private ColumnarBatch current;
+ private FileScanTask currentTask;
+
+ /**
+ * Create a new instance.
+ *
+ * @param tasks Combined file scan tasks.
+ * @param expectedSchema Read schema. The returned data will have this
schema.
+ * @param nameMapping Mapping from external schema names to Iceberg
type IDs.
+ * @param io File I/O.
+ * @param encryptionManager Encryption manager.
+ * @param caseSensitive If {@code true}, column names are case
sensitive.
+ * If {@code false}, column names are not case
sensitive.
+ * @param batchSize Batch size in number of rows. Each Arrow batch
contains
+ * a maximum of {@code batchSize} rows.
+ * @param reuseContainers If set to {@code false}, every {@link
Iterator#next()} call creates
+ * new instances of Arrow vectors.
+ * If set to {@code true}, the Arrow vectors in
the previous
+ * {@link Iterator#next()} may be reused for the
data returned
+ * in the current {@link Iterator#next()}.
+ * This option avoids allocating memory again and
again.
+ * Irrespective of the value of {@code
reuseContainers}, the Arrow vectors
+ * in the previous {@link Iterator#next()} call
are closed before creating
+ * new instances if the current {@link
Iterator#next()}.
+ */
+ VectorizedCombinedScanIterator(
+ CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
+ List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(),
false)
+ .map(CombinedScanTask::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ this.fileItr = fileTasks.iterator();
+ Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
+ fileTasks.stream()
+ .flatMap(fileScanTask ->
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+ .forEach(file -> keyMetadata.put(file.path().toString(),
file.keyMetadata()));
+ Stream<EncryptedInputFile> encrypted = keyMetadata.entrySet().stream()
+ .map(entry ->
EncryptedFiles.encryptedInput(io.newInputFile(entry.getKey()),
entry.getValue()));
+
+ // decrypt with the batch call to avoid multiple RPCs to a key server,
if possible
+ Iterable<InputFile> decryptedFiles =
encryptionManager.decrypt(encrypted::iterator);
+
+ Map<String, InputFile> files =
Maps.newHashMapWithExpectedSize(fileTasks.size());
+ decryptedFiles.forEach(decrypted ->
files.putIfAbsent(decrypted.location(), decrypted));
+ this.inputFiles = Collections.unmodifiableMap(files);
+
+ this.currentIterator = CloseableIterator.empty();
+ this.expectedSchema = expectedSchema;
+ this.nameMapping = nameMapping;
+ this.caseSensitive = caseSensitive;
+ this.batchSize = batchSize;
+ this.reuseContainers = reuseContainers;
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ while (true) {
+ if (currentIterator.hasNext()) {
+ this.current = currentIterator.next();
+ return true;
+ } else if (fileItr.hasNext()) {
+ this.currentIterator.close();
+ this.currentTask = fileItr.next();
+ this.currentIterator = open(currentTask);
+ } else {
+ this.currentIterator.close();
+ return false;
+ }
+ }
+ } catch (IOException | RuntimeException e) {
+ if (currentTask != null && !currentTask.isDataTask()) {
+ LOG.error("Error reading file: {}",
getInputFile(currentTask).location(), e);
+ }
+ throw new RuntimeException(e);
Review comment:
can you add a message/more context to this exception
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.nio.charset.StandardCharsets;
+import java.util.function.Supplier;
+import org.apache.arrow.vector.VarCharVector;
+import
org.apache.iceberg.arrow.vectorized.GenericArrowVectorAccessorFactory.StringFactory;
+
+final class ArrowVectorAccessors {
+
+ private static final GenericArrowVectorAccessorFactory<?, String, ?, ?>
factory;
+
+ static {
+ factory = new GenericArrowVectorAccessorFactory<>(
+ throwingSupplier("Decimal type is not supported"),
+ JavaStringFactory::new,
+ throwingSupplier("Struct type is not supported"),
+ throwingSupplier("List type is not supported")
+ );
+ }
+
+ private static <T> Supplier<T> throwingSupplier(String message) {
+ return () -> {
+ throw new UnsupportedOperationException(message);
+ };
+ }
+
+ private ArrowVectorAccessors() {
+ throw new UnsupportedOperationException();
Review comment:
log message
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/GenericArrowVectorAccessorFactory.java
##########
@@ -0,0 +1,693 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized;
+
+import java.lang.reflect.Array;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.function.IntFunction;
+import java.util.function.Supplier;
+import java.util.stream.IntStream;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.Float4Vector;
+import org.apache.arrow.vector.Float8Vector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.util.DecimalUtility;
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.PrimitiveType;
+
+/**
+ * This class is creates typed {@link ArrowVectorAccessor} from {@link
VectorHolder}.
+ * It provides a generic implementation for following Arrow types:
+ * <ul>
+ * <li>Decimal type can be deserialized to a type that supports decimal,
+ * e.g. BigDecimal or Spark's Decimal.</li>
+ * <li>UTF8 String type can deserialized to a Java String or Spark's
UTF8String.</li>
+ * <li>List type: the child elements of a list can be deserialized to
Spark's ColumnarArray or similar type.</li>
+ * <li>Struct type: the child elements of a struct can be deserialized to a
Spark's ArrowColumnVector
+ * or similar type.</li>
+ * </ul>
+ * @param <DecimalT> A concrete type that can represent a decimal.
+ * @param <Utf8StringT> A concrete type that can represent a UTF8 string.
+ * @param <ArrayT> A concrete type that can represent an array value in a list
vector, e.g. Spark's ColumnarArray.
+ * @param <ChildVectorT> A concrete type that can represent a child vector in
a struct, e.g. Spark's ArrowColumnVector.
+ */
+public class GenericArrowVectorAccessorFactory<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable> {
+
+ private final Supplier<DecimalFactory<DecimalT>> decimalFactorySupplier;
+ private final Supplier<StringFactory<Utf8StringT>> stringFactorySupplier;
+ private final Supplier<StructChildFactory<ChildVectorT>>
structChildFactorySupplier;
+ private final Supplier<ArrayFactory<ChildVectorT, ArrayT>>
arrayFactorySupplier;
+
+ /**
+ * The constructor is parameterized using the decimal, string, struct and
array factories.
+ * If a specific type is not supported, the factory supplier can raise an
+ * {@link UnsupportedOperationException}.
+ */
+ protected GenericArrowVectorAccessorFactory(
+ Supplier<DecimalFactory<DecimalT>> decimalFactorySupplier,
+ Supplier<StringFactory<Utf8StringT>> stringFactorySupplier,
+ Supplier<StructChildFactory<ChildVectorT>>
structChildFactorySupplier,
+ Supplier<ArrayFactory<ChildVectorT, ArrayT>> arrayFactorySupplier) {
+ this.decimalFactorySupplier = decimalFactorySupplier;
+ this.stringFactorySupplier = stringFactorySupplier;
+ this.structChildFactorySupplier = structChildFactorySupplier;
+ this.arrayFactorySupplier = arrayFactorySupplier;
+ }
+
+ public ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT>
getVectorAccessor(VectorHolder holder) {
+ Dictionary dictionary = holder.dictionary();
+ boolean isVectorDictEncoded = holder.isDictionaryEncoded();
+ FieldVector vector = holder.vector();
+ if (isVectorDictEncoded) {
+ ColumnDescriptor desc = holder.descriptor();
+ PrimitiveType primitive = desc.getPrimitiveType();
+ return getDictionaryVectorAccessor(dictionary, desc, vector, primitive);
+ } else {
+ return getPlainVectorAccessor(vector);
+ }
+ }
+
+ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT>
getDictionaryVectorAccessor(
+ Dictionary dictionary,
+ ColumnDescriptor desc,
+ FieldVector vector, PrimitiveType primitive) {
+ Preconditions.checkState(vector instanceof IntVector, "Dictionary ids
should be stored in IntVectors only");
+ if (primitive.getOriginalType() != null) {
+ switch (desc.getPrimitiveType().getOriginalType()) {
+ case ENUM:
+ case JSON:
+ case UTF8:
+ case BSON:
+ return new DictionaryStringAccessor<>((IntVector) vector,
dictionary, stringFactorySupplier.get());
+ case INT_64:
+ case TIMESTAMP_MILLIS:
+ case TIMESTAMP_MICROS:
+ return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
+ case DECIMAL:
+ switch (primitive.getPrimitiveTypeName()) {
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return new DictionaryDecimalBinaryAccessor<>(
+ (IntVector) vector,
+ dictionary,
+ decimalFactorySupplier.get());
+ case INT64:
+ return new DictionaryDecimalLongAccessor<>(
+ (IntVector) vector,
+ dictionary,
+ decimalFactorySupplier.get());
+ case INT32:
+ return new DictionaryDecimalIntAccessor<>(
+ (IntVector) vector,
+ dictionary,
+ decimalFactorySupplier.get());
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported base type for decimal: " +
primitive.getPrimitiveTypeName());
+ }
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported logical type: " + primitive.getOriginalType());
+ }
+ } else {
+ switch (primitive.getPrimitiveTypeName()) {
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return new DictionaryBinaryAccessor<>((IntVector) vector,
dictionary);
+ case FLOAT:
+ return new DictionaryFloatAccessor<>((IntVector) vector, dictionary);
+ case INT64:
+ return new DictionaryLongAccessor<>((IntVector) vector, dictionary);
+ case DOUBLE:
+ return new DictionaryDoubleAccessor<>((IntVector) vector,
dictionary);
+ default:
+ throw new UnsupportedOperationException("Unsupported type: " +
primitive);
+ }
+ }
+ }
+
+ @SuppressWarnings("checkstyle:CyclomaticComplexity")
+ private ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT>
+ getPlainVectorAccessor(FieldVector vector) {
+ if (vector instanceof BitVector) {
+ return new BooleanAccessor<>((BitVector) vector);
+ } else if (vector instanceof IntVector) {
+ return new IntAccessor<>((IntVector) vector);
+ } else if (vector instanceof BigIntVector) {
+ return new LongAccessor<>((BigIntVector) vector);
+ } else if (vector instanceof Float4Vector) {
+ return new FloatAccessor<>((Float4Vector) vector);
+ } else if (vector instanceof Float8Vector) {
+ return new DoubleAccessor<>((Float8Vector) vector);
+ } else if (vector instanceof DecimalVector) {
+ return new DecimalAccessor<>((DecimalVector) vector,
decimalFactorySupplier.get());
+ } else if (vector instanceof VarCharVector) {
+ return new StringAccessor<>((VarCharVector) vector,
stringFactorySupplier.get());
+ } else if (vector instanceof VarBinaryVector) {
+ return new BinaryAccessor<>((VarBinaryVector) vector);
+ } else if (vector instanceof DateDayVector) {
+ return new DateAccessor<>((DateDayVector) vector);
+ } else if (vector instanceof TimeStampMicroTZVector) {
+ return new TimestampMicroTzAccessor<>((TimeStampMicroTZVector) vector);
+ } else if (vector instanceof TimeStampMicroVector) {
+ return new TimestampMicroAccessor<>((TimeStampMicroVector) vector);
+ } else if (vector instanceof ListVector) {
+ ListVector listVector = (ListVector) vector;
+ return new ArrayAccessor<>(listVector, arrayFactorySupplier.get());
+ } else if (vector instanceof StructVector) {
+ StructVector structVector = (StructVector) vector;
+ return new StructAccessor<>(structVector,
structChildFactorySupplier.get());
+ }
+ throw new UnsupportedOperationException("Unsupported vector: " +
vector.getClass());
+ }
+
+ private static class BooleanAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+ private final BitVector vector;
+
+ BooleanAccessor(BitVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final boolean getBoolean(int rowId) {
+ return vector.get(rowId) == 1;
+ }
+ }
+
+ private static class IntAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT
extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final IntVector vector;
+
+ IntAccessor(IntVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final int getInt(int rowId) {
+ return vector.get(rowId);
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return getInt(rowId);
+ }
+ }
+
+ private static class LongAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final BigIntVector vector;
+
+ LongAccessor(BigIntVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class DictionaryLongAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+ private final IntVector offsetVector;
+ private final long[] decodedDictionary;
+
+ DictionaryLongAccessor(IntVector vector, Dictionary dictionary) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+ .mapToLong(dictionary::decodeToLong)
+ .toArray();
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return decodedDictionary[offsetVector.get(rowId)];
+ }
+ }
+
+ private static class FloatAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final Float4Vector vector;
+
+ FloatAccessor(Float4Vector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final float getFloat(int rowId) {
+ return vector.get(rowId);
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return getFloat(rowId);
+ }
+ }
+
+ private static class DictionaryFloatAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+ private final IntVector offsetVector;
+ private final float[] decodedDictionary;
+
+ DictionaryFloatAccessor(IntVector vector, Dictionary dictionary) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = new float[dictionary.getMaxId() + 1];
+ for (int i = 0; i <= dictionary.getMaxId(); i++) {
+ decodedDictionary[i] = dictionary.decodeToFloat(i);
+ }
+ }
+
+ @Override
+ public final float getFloat(int rowId) {
+ return decodedDictionary[offsetVector.get(rowId)];
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return getFloat(rowId);
+ }
+ }
+
+ private static class DoubleAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final Float8Vector vector;
+
+ DoubleAccessor(Float8Vector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class DictionaryDoubleAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+ private final IntVector offsetVector;
+ private final double[] decodedDictionary;
+
+ DictionaryDoubleAccessor(IntVector vector, Dictionary dictionary) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+ .mapToDouble(dictionary::decodeToDouble)
+ .toArray();
+ }
+
+ @Override
+ public final double getDouble(int rowId) {
+ return decodedDictionary[offsetVector.get(rowId)];
+ }
+ }
+
+ private static class StringAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final VarCharVector vector;
+ private final StringFactory<Utf8StringT> stringFactory;
+
+ StringAccessor(VarCharVector vector, StringFactory<Utf8StringT>
stringFactory) {
+ super(vector);
+ this.vector = vector;
+ this.stringFactory = stringFactory;
+ }
+
+ @Override
+ public final Utf8StringT getUTF8String(int rowId) {
+ return stringFactory.ofRow(vector, rowId);
+ }
+ }
+
+ private static class DictionaryStringAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+ private final Utf8StringT[] decodedDictionary;
+ private final IntVector offsetVector;
+
+ DictionaryStringAccessor(IntVector vector, Dictionary dictionary,
StringFactory<Utf8StringT> stringFactory) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+ .mapToObj(dictionary::decodeToBinary)
+ .map(binary -> stringFactory.ofBytes(binary.getBytes()))
+ .toArray(genericArray(stringFactory.getGenericClass()));
+ }
+
+ @Override
+ public final Utf8StringT getUTF8String(int rowId) {
+ int offset = offsetVector.get(rowId);
+ return decodedDictionary[offset];
+ }
+ }
+
+ private static class BinaryAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final VarBinaryVector vector;
+
+ BinaryAccessor(VarBinaryVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final byte[] getBinary(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class DictionaryBinaryAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+ private final IntVector offsetVector;
+ private final byte[][] decodedDictionary;
+
+ DictionaryBinaryAccessor(IntVector vector, Dictionary dictionary) {
+ super(vector);
+ this.offsetVector = vector;
+ this.decodedDictionary = IntStream.rangeClosed(0, dictionary.getMaxId())
+ .mapToObj(dictionary::decodeToBinary)
+ .map(Binary::getBytes)
+ .toArray(byte[][]::new);
+ }
+
+ @Override
+ public final byte[] getBinary(int rowId) {
+ int offset = offsetVector.get(rowId);
+ return decodedDictionary[offset];
+ }
+ }
+
+ private static class DateAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final DateDayVector vector;
+
+ DateAccessor(DateDayVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final int getInt(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class TimestampMicroTzAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final TimeStampMicroTZVector vector;
+
+ TimestampMicroTzAccessor(TimeStampMicroTZVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class TimestampMicroAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT>
{
+
+ private final TimeStampMicroVector vector;
+
+ TimestampMicroAccessor(TimeStampMicroVector vector) {
+ super(vector);
+ this.vector = vector;
+ }
+
+ @Override
+ public final long getLong(int rowId) {
+ return vector.get(rowId);
+ }
+ }
+
+ private static class ArrayAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final ListVector vector;
+ private final ChildVectorT arrayData;
+ private final ArrayFactory<ChildVectorT, ArrayT> arrayFactory;
+
+ ArrayAccessor(ListVector vector, ArrayFactory<ChildVectorT, ArrayT>
arrayFactory) {
+ super(vector);
+ this.vector = vector;
+ this.arrayFactory = arrayFactory;
+ this.arrayData = arrayFactory.ofChild(vector.getDataVector());
+ }
+
+ @Override
+ public final ArrayT getArray(int rowId) {
+ return arrayFactory.ofRow(vector, arrayData, rowId);
+ }
+ }
+
+ private static class StructAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+ StructAccessor(StructVector structVector, StructChildFactory<ChildVectorT>
structChildFactory) {
+ super(structVector, IntStream.range(0, structVector.size())
+ .mapToObj(structVector::getVectorById)
+ .map(structChildFactory::of)
+ .toArray(genericArray(structChildFactory.getGenericClass())));
+ }
+ }
+
+ private static class DecimalAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+
+ private final DecimalVector vector;
+ private final DecimalFactory<DecimalT> decimalFactory;
+
+ DecimalAccessor(DecimalVector vector, DecimalFactory<DecimalT>
decimalFactory) {
+ super(vector);
+ this.vector = vector;
+ this.decimalFactory = decimalFactory;
+ }
+
+ @Override
+ public final DecimalT getDecimal(int rowId, int precision, int scale) {
+ return decimalFactory.ofBigDecimal(
+ DecimalUtility.getBigDecimalFromArrowBuf(vector.getDataBuffer(),
rowId, scale),
+ precision, scale);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:VisibilityModifier")
+ private abstract static class
+ DictionaryDecimalAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT
extends AutoCloseable>
+ extends ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT,
ChildVectorT> {
+ final DecimalT[] cache;
Review comment:
why not private?
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessor.java
##########
@@ -17,79 +17,82 @@
* under the License.
*/
-package org.apache.iceberg.spark.data.vectorized;
+package org.apache.iceberg.arrow.vectorized;
import org.apache.arrow.vector.ValueVector;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.vectorized.ArrowColumnVector;
-import org.apache.spark.sql.vectorized.ColumnarArray;
-import org.apache.spark.unsafe.types.UTF8String;
-
-@SuppressWarnings("checkstyle:VisibilityModifier")
-public abstract class ArrowVectorAccessor {
+public class ArrowVectorAccessor<DecimalT, Utf8StringT, ArrayT, ChildVectorT
extends AutoCloseable> {
private final ValueVector vector;
- private final ArrowColumnVector[] childColumns;
+ private final ChildVectorT[] childColumns;
- ArrowVectorAccessor(ValueVector vector) {
- this.vector = vector;
- this.childColumns = new ArrowColumnVector[0];
+ protected ArrowVectorAccessor(ValueVector vector) {
+ this(vector, null);
}
- ArrowVectorAccessor(ValueVector vector, ArrowColumnVector[] children) {
+ protected ArrowVectorAccessor(ValueVector vector, ChildVectorT[] children) {
this.vector = vector;
this.childColumns = children;
}
- final void close() {
- for (ArrowColumnVector column : childColumns) {
- // Closing an ArrowColumnVector is expected to not throw any exception
- column.close();
+ public final void close() {
Review comment:
how come final? Does that still make sense now that we are using thsi as
a base class?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]