kasakrisz commented on code in PR #5251:
URL: https://github.com/apache/hive/pull/5251#discussion_r1609663752


##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java:
##########
@@ -75,8 +76,13 @@ public InputSplit[] getSplits(JobConf job, int numSplits) 
throws IOException {
   @Override
   public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, 
JobConf job,
                                                           Reporter reporter) 
throws IOException {
-    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
-    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, 
job, reporter);
+    if (split instanceof IcebergMergeSplit) {
+      IcebergMergeSplit mergeSplit = (IcebergMergeSplit) split;
+      return new MapredIcebergRecordReader<>(innerInputFormat, mergeSplit, 
job, reporter);

Review Comment:
   I haven't found any difference between creating a 
`MapredIcebergRecordReader` in case of `IcebergMergeSplit` and `IcebergSplit`.
   
   How about 
   ```
         return new MapredIcebergRecordReader<>((InputSplit)split, mergeSplit, 
job, reporter);
   ```



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -312,33 +314,43 @@ private static final class IcebergRecordReader<T> extends 
RecordReader<Void, T>
     private CloseableIterator<T> currentIterator;
     private Table table;
     private boolean fetchVirtualColumns;
+    private boolean isMerge = false;
+    private IcebergMergeSplit mergeSplit;
 
     @Override
     public void initialize(InputSplit split, TaskAttemptContext newContext) {
       // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
-      CombinedScanTask task = ((IcebergSplit) split).task();
       this.context = newContext;
       this.conf = newContext.getConfiguration();
-      this.table = SerializationUtil.deserializeFromBase64(
-                conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + 
conf.get(InputFormatConfig.TABLE_IDENTIFIER)));
+      this.table = HiveIcebergStorageHandler.table(conf, 
conf.get(InputFormatConfig.TABLE_IDENTIFIER));
       HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table);
-      this.tasks = task.files().iterator();
       this.nameMapping = 
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
       this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, 
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
       this.expectedSchema = readSchema(conf, table, caseSensitive);
       this.reuseContainers = 
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
       this.inMemoryDataModel = 
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
               InputFormatConfig.InMemoryDataModel.GENERIC);
       this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
+      if (split instanceof IcebergMergeSplit) {
+        this.isMerge = true;
+        this.mergeSplit = (IcebergMergeSplit) split;
+      } else {
+        CombinedScanTask task = ((IcebergSplit) split).task();
+        this.tasks = task.files().iterator();
+      }
       this.currentIterator = nextTask();
     }
 
     private CloseableIterator<T> nextTask() {
-      CloseableIterator<T> closeableIterator = open(tasks.next(), 
expectedSchema).iterator();
-      if (!fetchVirtualColumns || Utilities.getIsVectorized(conf)) {
-        return closeableIterator;
+      if (isMerge) {
+        return open(mergeSplit.getContentFile(), table.schema()).iterator();

Review Comment:
   Please move this to new class `MergeIcebergRecordReader`



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -312,33 +314,43 @@ private static final class IcebergRecordReader<T> extends 
RecordReader<Void, T>
     private CloseableIterator<T> currentIterator;
     private Table table;
     private boolean fetchVirtualColumns;
+    private boolean isMerge = false;
+    private IcebergMergeSplit mergeSplit;
 
     @Override
     public void initialize(InputSplit split, TaskAttemptContext newContext) {
       // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
-      CombinedScanTask task = ((IcebergSplit) split).task();
       this.context = newContext;
       this.conf = newContext.getConfiguration();
-      this.table = SerializationUtil.deserializeFromBase64(
-                conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + 
conf.get(InputFormatConfig.TABLE_IDENTIFIER)));
+      this.table = HiveIcebergStorageHandler.table(conf, 
conf.get(InputFormatConfig.TABLE_IDENTIFIER));
       HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table);
-      this.tasks = task.files().iterator();
       this.nameMapping = 
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
       this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, 
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
       this.expectedSchema = readSchema(conf, table, caseSensitive);
       this.reuseContainers = 
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
       this.inMemoryDataModel = 
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
               InputFormatConfig.InMemoryDataModel.GENERIC);
       this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
+      if (split instanceof IcebergMergeSplit) {
+        this.isMerge = true;
+        this.mergeSplit = (IcebergMergeSplit) split;
+      } else {
+        CombinedScanTask task = ((IcebergSplit) split).task();
+        this.tasks = task.files().iterator();
+      }

Review Comment:
   Please move this check into 
   
https://github.com/apache/hive/blob/18c434f346dc590201afa4159aeec62b7dd5e2cf/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java#L277-L279
   and create the proper RecordReader instance based on the descision:
   * IcebergMergeRecordReader
   * or the original IcebergRecordReader



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java:
##########
@@ -312,33 +314,43 @@ private static final class IcebergRecordReader<T> extends 
RecordReader<Void, T>
     private CloseableIterator<T> currentIterator;
     private Table table;
     private boolean fetchVirtualColumns;
+    private boolean isMerge = false;
+    private IcebergMergeSplit mergeSplit;

Review Comment:
   Please create a new class like `IcebergMergeRecordReader` and move 
everything related to merge into that class.
   You can extend the existing `IcebergRecordReader` if you can reuse the code 
and override methods if necessary



##########
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java:
##########
@@ -90,6 +96,12 @@ private static final class MapredIcebergRecordReader<T> 
extends AbstractMapredIc
       splitLength = split.getLength();
     }
 
+    
MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> 
mapreduceInputFormat,
+                              IcebergMergeSplit split, JobConf job, Reporter 
reporter) throws IOException {
+      super(mapreduceInputFormat, split, job, reporter);
+      splitLength = split.getLength();
+    }
+

Review Comment:
   Why does this constructor necessary? It does exactly the same as the 
existing one
   
https://github.com/apache/hive/blob/18c434f346dc590201afa4159aeec62b7dd5e2cf/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java#L87-L91
   
   `IcebergMergeSplit` and `IcebergSplit` has the same ancestor (`InputSplit`)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org
For additional commands, e-mail: gitbox-h...@hive.apache.org

Reply via email to