FrankChen021 commented on code in PR #19534: URL: https://github.com/apache/druid/pull/19534#discussion_r3334891669
########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergNativeRecordReader.java: ########## @@ -0,0 +1,417 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSourceFactory; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputStats; +import org.apache.druid.data.input.MapBasedInputRow; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SchemaParser; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.parquet.GenericParquetReaders; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.DeleteSchemaUtil; +import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.types.Types; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * An {@link InputSourceReader} that reads one Iceberg data file and applies any associated + * position-delete and equality-delete files before converting records to Druid {@link InputRow}s. + * + * <p>The table schema is deserialized from a JSON string (captured on the coordinator at + * planning time) — no catalog interaction is needed for schema resolution on the worker. + * + * <p>File I/O is handled by {@link WarehouseFileIO} for local filesystem paths. For non-local + * paths (S3, HDFS) a fallback is made to the catalog's own {@link FileIO} (Phase 1 limitation; + * see {@code WarehouseFileIO} Javadoc for details). + * + * <p>This reader is constructed by {@link IcebergFileTaskInputSource#reader} and handles the + * Iceberg V2 delete semantics in a straightforward way: + * <ol> + * <li>Deserialize the table schema from the embedded JSON string.</li> + * <li>Resolve the {@link FileIO} to use for this data file's path.</li> + * <li>Load position delete files and build a set of row positions to skip.</li> + * <li>Load equality delete files and build sets of field-value maps to skip.</li> + * <li>Stream through the data file, filtering out any deleted rows.</li> + * <li>Convert surviving {@link Record}s to {@link MapBasedInputRow} via + * {@link IcebergRecordConverter}.</li> + * </ol> + * + * <p>Only Parquet format is supported for the initial implementation. + */ +public class IcebergNativeRecordReader implements InputSourceReader +{ + private static final Logger log = new Logger(IcebergNativeRecordReader.class); + + private static final WarehouseFileIO WAREHOUSE_FILE_IO = new WarehouseFileIO(); Review Comment: [P1] Undefined FileIO class breaks compilation The new reader instantiates `WarehouseFileIO`, but no class with that name is defined in this package or elsewhere in the repo, and there is no import that could resolve it. Any build that compiles this extension will fail before the V2 ingestion path can run. Use an existing Iceberg `FileIO` implementation or add the missing implementation. ########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java: ########## @@ -121,8 +150,7 @@ public List<String> extractSnapshotDataFiles( } } - // Handle residual filter based on mode - if (detectedResidual != null) { + if (detectedResidual == null) { Review Comment: [P2] Residual-filter check is inverted This condition now enters the residual-handling branch when `detectedResidual` is null, even though null means no residual was found. With `ResidualFilterMode.FAIL`, valid scans that have no residual, such as partition-pruned scans, will throw a residual-filter error, while scans that do have a residual skip the warning/failure path. Restore the check to run only when `detectedResidual != null`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
