rdblue commented on a change in pull request #2933:
URL: https://github.com/apache/iceberg/pull/2933#discussion_r682724664



##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -84,11 +92,26 @@
  *     {@link Types.StructType}, {@link Types.FixedType} and
  *     {@link Types.DecimalType}
  *     See https://github.com/apache/iceberg/issues/2485 and 
https://github.com/apache/iceberg/issues/2486.</li>
- *     <li>Iceberg v2 spec is not supported.
+ *     <li>Delete files are not supported.
  *     See https://github.com/apache/iceberg/issues/2487.</li>
  * </ul>
  */
 public class ArrowReader extends CloseableGroup {
+  private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+  private static final Set<TypeID> SUPPORTED_TYPES = ImmutableSet.of(
+          TypeID.BOOLEAN,
+          TypeID.INTEGER,
+          TypeID.LONG,
+          TypeID.FLOAT,
+          TypeID.DOUBLE,
+          TypeID.STRING,
+          TypeID.TIMESTAMP,
+          TypeID.BINARY,
+          TypeID.DATE,
+          TypeID.UUID,
+          TypeID.TIME

Review comment:
       Indentation is off throughout this PR. It should be 2 spaces for an 
indent and 2 indents for a continuation. Here, you should use 4 spaces = 2 
indents.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -136,6 +159,14 @@ public ArrowReader(TableScan scan, int batchSize, boolean 
reuseContainers) {
    * previous {@link ColumnarBatch} may be reused for the next {@link 
ColumnarBatch}.
    * This implies that the caller should either use the {@link ColumnarBatch} 
or deep copy the
    * {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+   *
+   * <p>This method works for only when the following conditions are true:

Review comment:
       Nit: `<p>` normally goes on the line between paragraphs.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
      *                          in the previous {@link Iterator#next()} call 
are closed before creating
      *                          new instances if the current {@link 
Iterator#next()}.
      */
-    VectorizedCombinedScanIterator(
-        CloseableIterable<CombinedScanTask> tasks,
-        Schema expectedSchema,
-        String nameMapping,
-        FileIO io,
-        EncryptionManager encryptionManager,
-        boolean caseSensitive,
-        int batchSize,
-        boolean reuseContainers) {
-      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
-          .map(CombinedScanTask::files)
-          .flatMap(Collection::stream)
-          .collect(Collectors.toList());
+    VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+                                   Schema expectedSchema,
+                                   String nameMapping,
+                                   FileIO io,
+                                   EncryptionManager encryptionManager,
+                                   boolean caseSensitive,
+                                   int batchSize,
+                                   boolean reuseContainers) {

Review comment:
       This format is closer to what we would normally use for method 
arguments, but a more important rule is not making changes larger than 
necessary by changing whitespace. Could you remove the reformatting here and 
for other argument lists?

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
      *                          in the previous {@link Iterator#next()} call 
are closed before creating
      *                          new instances if the current {@link 
Iterator#next()}.
      */
-    VectorizedCombinedScanIterator(
-        CloseableIterable<CombinedScanTask> tasks,
-        Schema expectedSchema,
-        String nameMapping,
-        FileIO io,
-        EncryptionManager encryptionManager,
-        boolean caseSensitive,
-        int batchSize,
-        boolean reuseContainers) {
-      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
-          .map(CombinedScanTask::files)
-          .flatMap(Collection::stream)
-          .collect(Collectors.toList());
+    VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+                                   Schema expectedSchema,
+                                   String nameMapping,
+                                   FileIO io,
+                                   EncryptionManager encryptionManager,
+                                   boolean caseSensitive,
+                                   int batchSize,
+                                   boolean reuseContainers) {
+      List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(), 
false)
+              .map(CombinedScanTask::files)
+              .flatMap(Collection::stream)
+              .collect(Collectors.toList());
       this.fileItr = fileTasks.iterator();
 
+      boolean atLeastOneColumn = expectedSchema.columns().size() > 0;
+      boolean hasNoDeleteFiles = 
fileTasks.stream().noneMatch(TableScanUtil::hasDeletes);
+      boolean hasSupportedTypes = expectedSchema.columns().stream()
+              .map(c -> c.type().typeId())
+              .allMatch(SUPPORTED_TYPES::contains);
+      if (!atLeastOneColumn || !hasNoDeleteFiles || !hasSupportedTypes) {
+        throw new UnsupportedOperationException(
+                "ArrowReader is supported for the query schema with at least 
one column," +
+                        " with no delete files and for supported data types" +
+                        ", but found that atLeastOneColumn=" + 
atLeastOneColumn +
+                        ", hasNoDeleteFiles=" + hasNoDeleteFiles +
+                        ", hasSupportedTypes=" + hasSupportedTypes +
+                        ", supported types=" + SUPPORTED_TYPES +

Review comment:
       This error message isn't very helpful because it checks several things 
and then mixes them together so the person reading the error has to figure out 
what is already known here: whether the failure was because of delete files, 
supported types, or expected columns.
   
   This should be reformatted into 3 different checks with specific error 
messages, like "Cannot read files that require applying delete files: <split>", 
"Cannot read without at least one projected column", and "Cannot read 
unsupported column types: <unsupported-types>". The last one should produce a 
list of the types that are used but not supported, since that is already known.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
      *                          in the previous {@link Iterator#next()} call 
are closed before creating
      *                          new instances if the current {@link 
Iterator#next()}.
      */
-    VectorizedCombinedScanIterator(
-        CloseableIterable<CombinedScanTask> tasks,
-        Schema expectedSchema,
-        String nameMapping,
-        FileIO io,
-        EncryptionManager encryptionManager,
-        boolean caseSensitive,
-        int batchSize,
-        boolean reuseContainers) {
-      this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
-          .map(CombinedScanTask::files)
-          .flatMap(Collection::stream)
-          .collect(Collectors.toList());
+    VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+                                   Schema expectedSchema,
+                                   String nameMapping,
+                                   FileIO io,
+                                   EncryptionManager encryptionManager,
+                                   boolean caseSensitive,
+                                   int batchSize,
+                                   boolean reuseContainers) {
+      List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(), 
false)
+              .map(CombinedScanTask::files)
+              .flatMap(Collection::stream)
+              .collect(Collectors.toList());
       this.fileItr = fileTasks.iterator();
 
+      boolean atLeastOneColumn = expectedSchema.columns().size() > 0;
+      boolean hasNoDeleteFiles = 
fileTasks.stream().noneMatch(TableScanUtil::hasDeletes);
+      boolean hasSupportedTypes = expectedSchema.columns().stream()
+              .map(c -> c.type().typeId())
+              .allMatch(SUPPORTED_TYPES::contains);
+      if (!atLeastOneColumn || !hasNoDeleteFiles || !hasSupportedTypes) {
+        throw new UnsupportedOperationException(
+                "ArrowReader is supported for the query schema with at least 
one column," +
+                        " with no delete files and for supported data types" +
+                        ", but found that atLeastOneColumn=" + 
atLeastOneColumn +
+                        ", hasNoDeleteFiles=" + hasNoDeleteFiles +
+                        ", hasSupportedTypes=" + hasSupportedTypes +
+                        ", supported types=" + SUPPORTED_TYPES +
+                        ", expected columns=" + expectedSchema.columns()
+        );
+      }
+
       Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
       fileTasks.stream()
-          .flatMap(fileScanTask -> 
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+          .map(FileScanTask::file)
           .forEach(file -> keyMetadata.put(file.path().toString(), 
file.keyMetadata()));
-

Review comment:
       Nit: unnecessary whitespace change.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -252,17 +297,11 @@ public boolean hasNext() {
         }
       } catch (IOException | RuntimeException e) {
         if (currentTask != null && !currentTask.isDataTask()) {
-          throw new RuntimeException(
-              "Error reading file: " + getInputFile(currentTask).location() +
-                  ". Reason: the current task is not a data task, i.e. it 
cannot read data rows. " +
-                  "Ensure that the tasks passed to the constructor are data 
tasks. " +
-                  "The file scan tasks are: " + fileTasks,
-              e);
-        } else {
-          throw new RuntimeException(
-              "An error occurred while iterating through the file scan tasks 
or closing the iterator," +
-                  " see the stacktrace for further information. The file scan 
tasks are: " + fileTasks, e);
+          LOG.error("Error reading file: {}", 
getInputFile(currentTask).location(), e);
         }
+        // Wrap and throw a Runtime exception because Iterator::hasNext()
+        // cannot throw a checked exception.
+        throw new RuntimeException(e);

Review comment:
       If the exception is already a `RuntimeException` then this should not 
wrap it in another because that may change the exception type. You can use 
`ExceptionUtil.castAndThrow(e, RuntimeException.class);` for this.

##########
File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -323,10 +362,9 @@ private InputFile getInputFile(FileScanTask task) {
      * @param fileSchema             Schema of the data file.
      * @param setArrowValidityVector Indicates whether to set the validity 
vector in Arrow vectors.
      */
-    private static ArrowBatchReader buildReader(
-        Schema expectedSchema,
-        MessageType fileSchema,
-        boolean setArrowValidityVector) {
+    private static ArrowBatchReader buildReader(Schema expectedSchema,
+                                                MessageType fileSchema,
+                                                boolean 
setArrowValidityVector) {

Review comment:
       This looks like another change that doesn't need to be done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to