mayursrivastava commented on a change in pull request #2933:
URL: https://github.com/apache/iceberg/pull/2933#discussion_r683093034
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -252,17 +297,11 @@ public boolean hasNext() {
}
} catch (IOException | RuntimeException e) {
if (currentTask != null && !currentTask.isDataTask()) {
- throw new RuntimeException(
- "Error reading file: " + getInputFile(currentTask).location() +
- ". Reason: the current task is not a data task, i.e. it
cannot read data rows. " +
- "Ensure that the tasks passed to the constructor are data
tasks. " +
- "The file scan tasks are: " + fileTasks,
- e);
- } else {
- throw new RuntimeException(
- "An error occurred while iterating through the file scan tasks
or closing the iterator," +
- " see the stacktrace for further information. The file scan
tasks are: " + fileTasks, e);
+ LOG.error("Error reading file: {}",
getInputFile(currentTask).location(), e);
}
+ // Wrap and throw a Runtime exception because Iterator::hasNext()
+ // cannot throw a checked exception.
+ throw new RuntimeException(e);
Review comment:
Agreed. ExceptionUtil::castAndThrow was not public and to use it here,
I've made it public. I hope that is ok.
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -84,11 +92,26 @@
* {@link Types.StructType}, {@link Types.FixedType} and
* {@link Types.DecimalType}
* See https://github.com/apache/iceberg/issues/2485 and
https://github.com/apache/iceberg/issues/2486.</li>
- * <li>Iceberg v2 spec is not supported.
+ * <li>Delete files are not supported.
* See https://github.com/apache/iceberg/issues/2487.</li>
* </ul>
*/
public class ArrowReader extends CloseableGroup {
+ private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+ private static final Set<TypeID> SUPPORTED_TYPES = ImmutableSet.of(
+ TypeID.BOOLEAN,
+ TypeID.INTEGER,
+ TypeID.LONG,
+ TypeID.FLOAT,
+ TypeID.DOUBLE,
+ TypeID.STRING,
+ TypeID.TIMESTAMP,
+ TypeID.BINARY,
+ TypeID.DATE,
+ TypeID.UUID,
+ TypeID.TIME
Review comment:
fixed
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -136,6 +159,14 @@ public ArrowReader(TableScan scan, int batchSize, boolean
reuseContainers) {
* previous {@link ColumnarBatch} may be reused for the next {@link
ColumnarBatch}.
* This implies that the caller should either use the {@link ColumnarBatch}
or deep copy the
* {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ *
+ * <p>This method works for only when the following conditions are true:
Review comment:
fixed
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
* in the previous {@link Iterator#next()} call
are closed before creating
* new instances if the current {@link
Iterator#next()}.
*/
- VectorizedCombinedScanIterator(
- CloseableIterable<CombinedScanTask> tasks,
- Schema expectedSchema,
- String nameMapping,
- FileIO io,
- EncryptionManager encryptionManager,
- boolean caseSensitive,
- int batchSize,
- boolean reuseContainers) {
- this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
- .map(CombinedScanTask::files)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
Review comment:
done
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
* in the previous {@link Iterator#next()} call
are closed before creating
* new instances if the current {@link
Iterator#next()}.
*/
- VectorizedCombinedScanIterator(
- CloseableIterable<CombinedScanTask> tasks,
- Schema expectedSchema,
- String nameMapping,
- FileIO io,
- EncryptionManager encryptionManager,
- boolean caseSensitive,
- int batchSize,
- boolean reuseContainers) {
- this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
- .map(CombinedScanTask::files)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
+ List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(),
false)
+ .map(CombinedScanTask::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
this.fileItr = fileTasks.iterator();
+ boolean atLeastOneColumn = expectedSchema.columns().size() > 0;
+ boolean hasNoDeleteFiles =
fileTasks.stream().noneMatch(TableScanUtil::hasDeletes);
+ boolean hasSupportedTypes = expectedSchema.columns().stream()
+ .map(c -> c.type().typeId())
+ .allMatch(SUPPORTED_TYPES::contains);
+ if (!atLeastOneColumn || !hasNoDeleteFiles || !hasSupportedTypes) {
+ throw new UnsupportedOperationException(
+ "ArrowReader is supported for the query schema with at least
one column," +
+ " with no delete files and for supported data types" +
+ ", but found that atLeastOneColumn=" +
atLeastOneColumn +
+ ", hasNoDeleteFiles=" + hasNoDeleteFiles +
+ ", hasSupportedTypes=" + hasSupportedTypes +
+ ", supported types=" + SUPPORTED_TYPES +
Review comment:
Fixed
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
* in the previous {@link Iterator#next()} call
are closed before creating
* new instances if the current {@link
Iterator#next()}.
*/
- VectorizedCombinedScanIterator(
- CloseableIterable<CombinedScanTask> tasks,
- Schema expectedSchema,
- String nameMapping,
- FileIO io,
- EncryptionManager encryptionManager,
- boolean caseSensitive,
- int batchSize,
- boolean reuseContainers) {
- this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
- .map(CombinedScanTask::files)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
+ List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(),
false)
+ .map(CombinedScanTask::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
this.fileItr = fileTasks.iterator();
+ boolean atLeastOneColumn = expectedSchema.columns().size() > 0;
+ boolean hasNoDeleteFiles =
fileTasks.stream().noneMatch(TableScanUtil::hasDeletes);
+ boolean hasSupportedTypes = expectedSchema.columns().stream()
+ .map(c -> c.type().typeId())
+ .allMatch(SUPPORTED_TYPES::contains);
+ if (!atLeastOneColumn || !hasNoDeleteFiles || !hasSupportedTypes) {
+ throw new UnsupportedOperationException(
+ "ArrowReader is supported for the query schema with at least
one column," +
+ " with no delete files and for supported data types" +
+ ", but found that atLeastOneColumn=" +
atLeastOneColumn +
+ ", hasNoDeleteFiles=" + hasNoDeleteFiles +
+ ", hasSupportedTypes=" + hasSupportedTypes +
+ ", supported types=" + SUPPORTED_TYPES +
+ ", expected columns=" + expectedSchema.columns()
+ );
+ }
+
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
fileTasks.stream()
- .flatMap(fileScanTask ->
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+ .map(FileScanTask::file)
.forEach(file -> keyMetadata.put(file.path().toString(),
file.keyMetadata()));
-
Review comment:
fixed
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -323,10 +362,9 @@ private InputFile getInputFile(FileScanTask task) {
* @param fileSchema Schema of the data file.
* @param setArrowValidityVector Indicates whether to set the validity
vector in Arrow vectors.
*/
- private static ArrowBatchReader buildReader(
- Schema expectedSchema,
- MessageType fileSchema,
- boolean setArrowValidityVector) {
+ private static ArrowBatchReader buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ boolean
setArrowValidityVector) {
Review comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]