mayursrivastava commented on a change in pull request #2933:
URL: https://github.com/apache/iceberg/pull/2933#discussion_r683093034



##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -252,17 +297,11 @@ public boolean hasNext() {
         }
       } catch (IOException | RuntimeException e) {
         if (currentTask != null && !currentTask.isDataTask()) {
-          throw new RuntimeException(
-              "Error reading file: " + getInputFile(currentTask).location() +
-                  ". Reason: the current task is not a data task, i.e. it 
cannot read data rows. " +
-                  "Ensure that the tasks passed to the constructor are data 
tasks. " +
-                  "The file scan tasks are: " + fileTasks,
-              e);
-        } else {
-          throw new RuntimeException(
-              "An error occurred while iterating through the file scan tasks 
or closing the iterator," +
-                  " see the stacktrace for further information. The file scan 
tasks are: " + fileTasks, e);
+          LOG.error("Error reading file: {}", 
getInputFile(currentTask).location(), e);
         }
+        // Wrap and throw a Runtime exception because Iterator::hasNext()
+        // cannot throw a checked exception.
+        throw new RuntimeException(e);

Review comment:
       Agreed. ExceptionUtil::castAndThrow was not public and to use it here, 
I've made it public. I hope that is ok. 

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -84,11 +92,26 @@
  *     {@link Types.StructType}, {@link Types.FixedType} and
  *     {@link Types.DecimalType}
  *     See https://github.com/apache/iceberg/issues/2485 and 
https://github.com/apache/iceberg/issues/2486.</li>
- *     <li>Iceberg v2 spec is not supported.
+ *     <li>Delete files are not supported.
  *     See https://github.com/apache/iceberg/issues/2487.</li>
  * </ul>
  */
 public class ArrowReader extends CloseableGroup {
+  private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+  private static final Set<TypeID> SUPPORTED_TYPES = ImmutableSet.of(
+          TypeID.BOOLEAN,
+          TypeID.INTEGER,
+          TypeID.LONG,
+          TypeID.FLOAT,
+          TypeID.DOUBLE,
+          TypeID.STRING,
+          TypeID.TIMESTAMP,
+          TypeID.BINARY,
+          TypeID.DATE,
+          TypeID.UUID,
+          TypeID.TIME

Review comment:
       fixed

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -136,6 +159,14 @@ public ArrowReader(TableScan scan, int batchSize, boolean 
reuseContainers) {
    * previous {@link ColumnarBatch} may be reused for the next {@link 
ColumnarBatch}.
    * This implies that the caller should either use the {@link ColumnarBatch} 
or deep copy the
    * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   *
+   * <p>This method works for only when the following conditions are true:

Review comment:
       fixed

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
      *                          in the previous {@link Iterator#next()} call 
are closed before creating
      *                          new instances if the current {@link 
Iterator#next()}.
      */
-    VectorizedCombinedScanIterator(
-        CloseableIterable<CombinedScanTask> tasks,
-        Schema expectedSchema,
-        String nameMapping,
-        FileIO io,
-        EncryptionManager encryptionManager,
-        boolean caseSensitive,
-        int batchSize,
-        boolean reuseContainers) {
-      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
-          .map(CombinedScanTask::files)
-          .flatMap(Collection::stream)
-          .collect(Collectors.toList());
+    VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+                                   Schema expectedSchema,
+                                   String nameMapping,
+                                   FileIO io,
+                                   EncryptionManager encryptionManager,
+                                   boolean caseSensitive,
+                                   int batchSize,
+                                   boolean reuseContainers) {

Review comment:
       done

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
      *                          in the previous {@link Iterator#next()} call 
are closed before creating
      *                          new instances if the current {@link 
Iterator#next()}.
      */
-    VectorizedCombinedScanIterator(
-        CloseableIterable<CombinedScanTask> tasks,
-        Schema expectedSchema,
-        String nameMapping,
-        FileIO io,
-        EncryptionManager encryptionManager,
-        boolean caseSensitive,
-        int batchSize,
-        boolean reuseContainers) {
-      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
-          .map(CombinedScanTask::files)
-          .flatMap(Collection::stream)
-          .collect(Collectors.toList());
+    VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+                                   Schema expectedSchema,
+                                   String nameMapping,
+                                   FileIO io,
+                                   EncryptionManager encryptionManager,
+                                   boolean caseSensitive,
+                                   int batchSize,
+                                   boolean reuseContainers) {
+      List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(), 
false)
+              .map(CombinedScanTask::files)
+              .flatMap(Collection::stream)
+              .collect(Collectors.toList());
       this.fileItr = fileTasks.iterator();
 
+      boolean atLeastOneColumn = expectedSchema.columns().size() > 0;
+      boolean hasNoDeleteFiles = 
fileTasks.stream().noneMatch(TableScanUtil::hasDeletes);
+      boolean hasSupportedTypes = expectedSchema.columns().stream()
+              .map(c -> c.type().typeId())
+              .allMatch(SUPPORTED_TYPES::contains);
+      if (!atLeastOneColumn || !hasNoDeleteFiles || !hasSupportedTypes) {
+        throw new UnsupportedOperationException(
+                "ArrowReader is supported for the query schema with at least 
one column," +
+                        " with no delete files and for supported data types" +
+                        ", but found that atLeastOneColumn=" + 
atLeastOneColumn +
+                        ", hasNoDeleteFiles=" + hasNoDeleteFiles +
+                        ", hasSupportedTypes=" + hasSupportedTypes +
+                        ", supported types=" + SUPPORTED_TYPES +

Review comment:
       Fixed

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
      *                          in the previous {@link Iterator#next()} call 
are closed before creating
      *                          new instances if the current {@link 
Iterator#next()}.
      */
-    VectorizedCombinedScanIterator(
-        CloseableIterable<CombinedScanTask> tasks,
-        Schema expectedSchema,
-        String nameMapping,
-        FileIO io,
-        EncryptionManager encryptionManager,
-        boolean caseSensitive,
-        int batchSize,
-        boolean reuseContainers) {
-      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
-          .map(CombinedScanTask::files)
-          .flatMap(Collection::stream)
-          .collect(Collectors.toList());
+    VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+                                   Schema expectedSchema,
+                                   String nameMapping,
+                                   FileIO io,
+                                   EncryptionManager encryptionManager,
+                                   boolean caseSensitive,
+                                   int batchSize,
+                                   boolean reuseContainers) {
+      List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(), 
false)
+              .map(CombinedScanTask::files)
+              .flatMap(Collection::stream)
+              .collect(Collectors.toList());
       this.fileItr = fileTasks.iterator();
 
+      boolean atLeastOneColumn = expectedSchema.columns().size() > 0;
+      boolean hasNoDeleteFiles = 
fileTasks.stream().noneMatch(TableScanUtil::hasDeletes);
+      boolean hasSupportedTypes = expectedSchema.columns().stream()
+              .map(c -> c.type().typeId())
+              .allMatch(SUPPORTED_TYPES::contains);
+      if (!atLeastOneColumn || !hasNoDeleteFiles || !hasSupportedTypes) {
+        throw new UnsupportedOperationException(
+                "ArrowReader is supported for the query schema with at least 
one column," +
+                        " with no delete files and for supported data types" +
+                        ", but found that atLeastOneColumn=" + 
atLeastOneColumn +
+                        ", hasNoDeleteFiles=" + hasNoDeleteFiles +
+                        ", hasSupportedTypes=" + hasSupportedTypes +
+                        ", supported types=" + SUPPORTED_TYPES +
+                        ", expected columns=" + expectedSchema.columns()
+        );
+      }
+
       Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
       fileTasks.stream()
-          .flatMap(fileScanTask -> 
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+          .map(FileScanTask::file)
           .forEach(file -> keyMetadata.put(file.path().toString(), 
file.keyMetadata()));
-

Review comment:
       fixed

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -323,10 +362,9 @@ private InputFile getInputFile(FileScanTask task) {
      * @param fileSchema             Schema of the data file.
      * @param setArrowValidityVector Indicates whether to set the validity 
vector in Arrow vectors.
      */
-    private static ArrowBatchReader buildReader(
-        Schema expectedSchema,
-        MessageType fileSchema,
-        boolean setArrowValidityVector) {
+    private static ArrowBatchReader buildReader(Schema expectedSchema,
+                                                MessageType fileSchema,
+                                                boolean 
setArrowValidityVector) {

Review comment:
       fixed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to