kasakrisz commented on code in PR #5251: URL: https://github.com/apache/hive/pull/5251#discussion_r1609663752
########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java: ########## @@ -75,8 +76,13 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { @Override public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { - IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit(); - return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, job, reporter); + if (split instanceof IcebergMergeSplit) { + IcebergMergeSplit mergeSplit = (IcebergMergeSplit) split; + return new MapredIcebergRecordReader<>(innerInputFormat, mergeSplit, job, reporter); Review Comment: I haven't found any difference between creating a `MapredIcebergRecordReader` in case of `IcebergMergeSplit` and `IcebergSplit`. How about ``` return new MapredIcebergRecordReader<>((InputSplit)split, mergeSplit, job, reporter); ``` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java: ########## @@ -312,33 +314,43 @@ private static final class IcebergRecordReader<T> extends RecordReader<Void, T> private CloseableIterator<T> currentIterator; private Table table; private boolean fetchVirtualColumns; + private boolean isMerge = false; + private IcebergMergeSplit mergeSplit; @Override public void initialize(InputSplit split, TaskAttemptContext newContext) { // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances - CombinedScanTask task = ((IcebergSplit) split).task(); this.context = newContext; this.conf = newContext.getConfiguration(); - this.table = SerializationUtil.deserializeFromBase64( - conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER))); + this.table = HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)); HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table); - this.tasks = task.files().iterator(); this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT); this.expectedSchema = readSchema(conf, table, caseSensitive); this.reuseContainers = conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false); this.inMemoryDataModel = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, InputFormatConfig.InMemoryDataModel.GENERIC); this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf); + if (split instanceof IcebergMergeSplit) { + this.isMerge = true; + this.mergeSplit = (IcebergMergeSplit) split; + } else { + CombinedScanTask task = ((IcebergSplit) split).task(); + this.tasks = task.files().iterator(); + } this.currentIterator = nextTask(); } private CloseableIterator<T> nextTask() { - CloseableIterator<T> closeableIterator = open(tasks.next(), expectedSchema).iterator(); - if (!fetchVirtualColumns || Utilities.getIsVectorized(conf)) { - return closeableIterator; + if (isMerge) { + return open(mergeSplit.getContentFile(), table.schema()).iterator(); Review Comment: Please move this to new class `MergeIcebergRecordReader` ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java: ########## @@ -312,33 +314,43 @@ private static final class IcebergRecordReader<T> extends RecordReader<Void, T> private CloseableIterator<T> currentIterator; private Table table; private boolean fetchVirtualColumns; + private boolean isMerge = false; + private IcebergMergeSplit mergeSplit; @Override public void initialize(InputSplit split, TaskAttemptContext newContext) { // For now IcebergInputFormat does its own split planning and does not accept FileSplit instances - CombinedScanTask task = ((IcebergSplit) split).task(); this.context = newContext; this.conf = newContext.getConfiguration(); - this.table = SerializationUtil.deserializeFromBase64( - conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + conf.get(InputFormatConfig.TABLE_IDENTIFIER))); + this.table = HiveIcebergStorageHandler.table(conf, conf.get(InputFormatConfig.TABLE_IDENTIFIER)); HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table); - this.tasks = task.files().iterator(); this.nameMapping = table.properties().get(TableProperties.DEFAULT_NAME_MAPPING); this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, InputFormatConfig.CASE_SENSITIVE_DEFAULT); this.expectedSchema = readSchema(conf, table, caseSensitive); this.reuseContainers = conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false); this.inMemoryDataModel = conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL, InputFormatConfig.InMemoryDataModel.GENERIC); this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf); + if (split instanceof IcebergMergeSplit) { + this.isMerge = true; + this.mergeSplit = (IcebergMergeSplit) split; + } else { + CombinedScanTask task = ((IcebergSplit) split).task(); + this.tasks = task.files().iterator(); + } Review Comment: Please move this check into https://github.com/apache/hive/blob/18c434f346dc590201afa4159aeec62b7dd5e2cf/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L277-L279 and create the proper RecordReader instance based on the descision: * IcebergMergeRecordReader * or the original IcebergRecordReader ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java: ########## @@ -312,33 +314,43 @@ private static final class IcebergRecordReader<T> extends RecordReader<Void, T> private CloseableIterator<T> currentIterator; private Table table; private boolean fetchVirtualColumns; + private boolean isMerge = false; + private IcebergMergeSplit mergeSplit; Review Comment: Please create a new class like `IcebergMergeRecordReader` and move everything related to merge into that class. You can extend the existing `IcebergRecordReader` if you can reuse the code and override methods if necessary ########## iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java: ########## @@ -90,6 +96,12 @@ private static final class MapredIcebergRecordReader<T> extends AbstractMapredIc splitLength = split.getLength(); } + MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> mapreduceInputFormat, + IcebergMergeSplit split, JobConf job, Reporter reporter) throws IOException { + super(mapreduceInputFormat, split, job, reporter); + splitLength = split.getLength(); + } + Review Comment: Why does this constructor necessary? It does exactly the same as the existing one https://github.com/apache/hive/blob/18c434f346dc590201afa4159aeec62b7dd5e2cf/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java#L87-L91 `IcebergMergeSplit` and `IcebergSplit` has the same ancestor (`InputSplit`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For additional commands, e-mail: gitbox-h...@hive.apache.org