rdblue commented on a change in pull request #2933:
URL: https://github.com/apache/iceberg/pull/2933#discussion_r682724664
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -84,11 +92,26 @@
* {@link Types.StructType}, {@link Types.FixedType} and
* {@link Types.DecimalType}
* See https://github.com/apache/iceberg/issues/2485 and
https://github.com/apache/iceberg/issues/2486.</li>
- * <li>Iceberg v2 spec is not supported.
+ * <li>Delete files are not supported.
* See https://github.com/apache/iceberg/issues/2487.</li>
* </ul>
*/
public class ArrowReader extends CloseableGroup {
+ private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class);
+
+ private static final Set<TypeID> SUPPORTED_TYPES = ImmutableSet.of(
+ TypeID.BOOLEAN,
+ TypeID.INTEGER,
+ TypeID.LONG,
+ TypeID.FLOAT,
+ TypeID.DOUBLE,
+ TypeID.STRING,
+ TypeID.TIMESTAMP,
+ TypeID.BINARY,
+ TypeID.DATE,
+ TypeID.UUID,
+ TypeID.TIME
Review comment:
Indentation is off throughout this PR. It should be 2 spaces for an
indent and 2 indents for a continuation. Here, you should use 4 spaces = 2
indents.
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -136,6 +159,14 @@ public ArrowReader(TableScan scan, int batchSize, boolean
reuseContainers) {
* previous {@link ColumnarBatch} may be reused for the next {@link
ColumnarBatch}.
* This implies that the caller should either use the {@link ColumnarBatch}
or deep copy the
* {@link ColumnarBatch} before getting the next {@link ColumnarBatch}.
+ *
+ * <p>This method works for only when the following conditions are true:
Review comment:
Nit: `<p>` normally goes on the line between paragraphs.
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
* in the previous {@link Iterator#next()} call
are closed before creating
* new instances if the current {@link
Iterator#next()}.
*/
- VectorizedCombinedScanIterator(
- CloseableIterable<CombinedScanTask> tasks,
- Schema expectedSchema,
- String nameMapping,
- FileIO io,
- EncryptionManager encryptionManager,
- boolean caseSensitive,
- int batchSize,
- boolean reuseContainers) {
- this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
- .map(CombinedScanTask::files)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
Review comment:
This format is closer to what we would normally use for method
arguments, but a more important rule is not making changes larger than
necessary by changing whitespace. Could you remove the reformatting here and
for other argument lists?
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
* in the previous {@link Iterator#next()} call
are closed before creating
* new instances if the current {@link
Iterator#next()}.
*/
- VectorizedCombinedScanIterator(
- CloseableIterable<CombinedScanTask> tasks,
- Schema expectedSchema,
- String nameMapping,
- FileIO io,
- EncryptionManager encryptionManager,
- boolean caseSensitive,
- int batchSize,
- boolean reuseContainers) {
- this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
- .map(CombinedScanTask::files)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
+ List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(),
false)
+ .map(CombinedScanTask::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
this.fileItr = fileTasks.iterator();
+ boolean atLeastOneColumn = expectedSchema.columns().size() > 0;
+ boolean hasNoDeleteFiles =
fileTasks.stream().noneMatch(TableScanUtil::hasDeletes);
+ boolean hasSupportedTypes = expectedSchema.columns().stream()
+ .map(c -> c.type().typeId())
+ .allMatch(SUPPORTED_TYPES::contains);
+ if (!atLeastOneColumn || !hasNoDeleteFiles || !hasSupportedTypes) {
+ throw new UnsupportedOperationException(
+ "ArrowReader is supported for the query schema with at least
one column," +
+ " with no delete files and for supported data types" +
+ ", but found that atLeastOneColumn=" +
atLeastOneColumn +
+ ", hasNoDeleteFiles=" + hasNoDeleteFiles +
+ ", hasSupportedTypes=" + hasSupportedTypes +
+ ", supported types=" + SUPPORTED_TYPES +
Review comment:
This error message isn't very helpful because it checks several things
and then mixes them together so the person reading the error has to figure out
what is already known here: whether the failure was because of delete files,
supported types, or expected columns.
This should be reformatted into 3 different checks with specific error
messages, like "Cannot read files that require applying delete files: <split>",
"Cannot read without at least one projected column", and "Cannot read
unsupported column types: <unsupported-types>". The last one should produce a
list of the types that are used but not supported, since that is already known.
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -197,26 +227,41 @@ public void close() throws IOException {
* in the previous {@link Iterator#next()} call
are closed before creating
* new instances if the current {@link
Iterator#next()}.
*/
- VectorizedCombinedScanIterator(
- CloseableIterable<CombinedScanTask> tasks,
- Schema expectedSchema,
- String nameMapping,
- FileIO io,
- EncryptionManager encryptionManager,
- boolean caseSensitive,
- int batchSize,
- boolean reuseContainers) {
- this.fileTasks = StreamSupport.stream(tasks.spliterator(), false)
- .map(CombinedScanTask::files)
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
+ VectorizedCombinedScanIterator(CloseableIterable<CombinedScanTask> tasks,
+ Schema expectedSchema,
+ String nameMapping,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ boolean caseSensitive,
+ int batchSize,
+ boolean reuseContainers) {
+ List<FileScanTask> fileTasks = StreamSupport.stream(tasks.spliterator(),
false)
+ .map(CombinedScanTask::files)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
this.fileItr = fileTasks.iterator();
+ boolean atLeastOneColumn = expectedSchema.columns().size() > 0;
+ boolean hasNoDeleteFiles =
fileTasks.stream().noneMatch(TableScanUtil::hasDeletes);
+ boolean hasSupportedTypes = expectedSchema.columns().stream()
+ .map(c -> c.type().typeId())
+ .allMatch(SUPPORTED_TYPES::contains);
+ if (!atLeastOneColumn || !hasNoDeleteFiles || !hasSupportedTypes) {
+ throw new UnsupportedOperationException(
+ "ArrowReader is supported for the query schema with at least
one column," +
+ " with no delete files and for supported data types" +
+ ", but found that atLeastOneColumn=" +
atLeastOneColumn +
+ ", hasNoDeleteFiles=" + hasNoDeleteFiles +
+ ", hasSupportedTypes=" + hasSupportedTypes +
+ ", supported types=" + SUPPORTED_TYPES +
+ ", expected columns=" + expectedSchema.columns()
+ );
+ }
+
Map<String, ByteBuffer> keyMetadata = Maps.newHashMap();
fileTasks.stream()
- .flatMap(fileScanTask ->
Stream.concat(Stream.of(fileScanTask.file()), fileScanTask.deletes().stream()))
+ .map(FileScanTask::file)
.forEach(file -> keyMetadata.put(file.path().toString(),
file.keyMetadata()));
-
Review comment:
Nit: unnecessary whitespace change.
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -252,17 +297,11 @@ public boolean hasNext() {
}
} catch (IOException | RuntimeException e) {
if (currentTask != null && !currentTask.isDataTask()) {
- throw new RuntimeException(
- "Error reading file: " + getInputFile(currentTask).location() +
- ". Reason: the current task is not a data task, i.e. it
cannot read data rows. " +
- "Ensure that the tasks passed to the constructor are data
tasks. " +
- "The file scan tasks are: " + fileTasks,
- e);
- } else {
- throw new RuntimeException(
- "An error occurred while iterating through the file scan tasks
or closing the iterator," +
- " see the stacktrace for further information. The file scan
tasks are: " + fileTasks, e);
+ LOG.error("Error reading file: {}",
getInputFile(currentTask).location(), e);
}
+ // Wrap and throw a Runtime exception because Iterator::hasNext()
+ // cannot throw a checked exception.
+ throw new RuntimeException(e);
Review comment:
If the exception is already a `RuntimeException` then this should not
wrap it in another because that may change the exception type. You can use
`ExceptionUtil.castAndThrow(e, RuntimeException.class);` for this.
##########
File path:
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java
##########
@@ -323,10 +362,9 @@ private InputFile getInputFile(FileScanTask task) {
* @param fileSchema Schema of the data file.
* @param setArrowValidityVector Indicates whether to set the validity
vector in Arrow vectors.
*/
- private static ArrowBatchReader buildReader(
- Schema expectedSchema,
- MessageType fileSchema,
- boolean setArrowValidityVector) {
+ private static ArrowBatchReader buildReader(Schema expectedSchema,
+ MessageType fileSchema,
+ boolean
setArrowValidityVector) {
Review comment:
This looks like another change that doesn't need to be done.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]