This is an automated email from the ASF dual-hosted git repository.

sbadhya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 023c8b5ed54 HIVE-28258: Use Iceberg semantics for Merge task (#5251) 
(Sourabh Badhya reviewed by Krisztian Kasa, Denys Kuzmenko)
023c8b5ed54 is described below

commit 023c8b5ed54c49f166103428ab0b7d247df826c3
Author: Sourabh Badhya <[email protected]>
AuthorDate: Wed Jun 19 11:26:07 2024 +0530

    HIVE-28258: Use Iceberg semantics for Merge task (#5251) (Sourabh Badhya 
reviewed by Krisztian Kasa, Denys Kuzmenko)
---
 .../org/apache/iceberg/mr/hive/FilesForCommit.java |  16 +-
 .../iceberg/mr/hive/HiveIcebergInputFormat.java    |  12 +-
 .../mr/hive/HiveIcebergOutputCommitter.java        | 177 +++++++++--
 .../iceberg/mr/hive/HiveIcebergStorageHandler.java |  43 +--
 .../mr/hive/IcebergMergeTaskProperties.java        |  36 ++-
 .../apache/iceberg/mr/hive/IcebergTableUtil.java   |   1 +
 .../mapred/AbstractMapredIcebergRecordReader.java  |   4 +-
 .../mr/mapred/MapredIcebergInputFormat.java        |  14 +-
 .../mr/mapreduce/AbstractIcebergRecordReader.java  | 156 +++++++++
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   | 353 +--------------------
 .../mr/mapreduce/IcebergMergeRecordReader.java     | 169 ++++++++++
 .../{IcebergSplit.java => IcebergMergeSplit.java}  |  63 ++--
 .../iceberg/mr/mapreduce/IcebergRecordReader.java  | 308 ++++++++++++++++++
 .../apache/iceberg/mr/mapreduce/IcebergSplit.java  |   2 +-
 .../mr/mapreduce/IcebergSplitContainer.java        |   4 +-
 .../hadoop/hive/ql/io/CombineHiveInputFormat.java  |  18 +-
 .../hadoop/hive/ql/io/CombineHiveRecordReader.java |  31 +-
 .../ql/plan/ConditionalResolverMergeFiles.java     |  23 +-
 .../org/apache/hadoop/hive/ql/plan/MapWork.java    |  22 +-
 .../hadoop/hive/ql/plan/MergeTaskProperties.java   |   8 +-
 20 files changed, 930 insertions(+), 530 deletions(-)

diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
index 2e25f5a8c2e..1ca3872b424 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/FilesForCommit.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.hadoop.fs.Path;
 import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
@@ -35,27 +36,31 @@ public class FilesForCommit implements Serializable {
   private final Collection<DeleteFile> deleteFiles;
   private final Collection<DataFile> replacedDataFiles;
   private final Collection<CharSequence> referencedDataFiles;
+  private final Collection<Path> mergedAndDeletedFiles;
 
   public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> 
deleteFiles) {
     this(dataFiles, deleteFiles, Collections.emptyList());
   }
 
   public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> 
deleteFiles,
-      Collection<DataFile> replacedDataFiles, Collection<CharSequence> 
referencedDataFiles) {
+      Collection<DataFile> replacedDataFiles, Collection<CharSequence> 
referencedDataFiles,
+      Collection<Path> mergedAndDeletedFiles) {
     this.dataFiles = dataFiles;
     this.deleteFiles = deleteFiles;
     this.replacedDataFiles = replacedDataFiles;
     this.referencedDataFiles = referencedDataFiles;
+    this.mergedAndDeletedFiles = mergedAndDeletedFiles;
   }
 
   public FilesForCommit(Collection<DataFile> dataFiles, Collection<DeleteFile> 
deleteFiles,
       Collection<DataFile> replacedDataFiles) {
-    this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet());
+    this(dataFiles, deleteFiles, replacedDataFiles, Collections.emptySet(), 
Collections.emptySet());
   }
 
   public static FilesForCommit onlyDelete(Collection<DeleteFile> deleteFiles,
       Collection<CharSequence> referencedDataFiles) {
-    return new FilesForCommit(Collections.emptyList(), deleteFiles, 
Collections.emptyList(), referencedDataFiles);
+    return new FilesForCommit(Collections.emptyList(), deleteFiles, 
Collections.emptyList(),
+            referencedDataFiles, Collections.emptySet());
   }
 
   public static FilesForCommit onlyData(Collection<DataFile> dataFiles) {
@@ -86,6 +91,10 @@ public class FilesForCommit implements Serializable {
     return referencedDataFiles;
   }
 
+  public Collection<Path> mergedAndDeletedFiles() {
+    return mergedAndDeletedFiles;
+  }
+
   public Collection<? extends ContentFile> allFiles() {
     return Stream.concat(dataFiles.stream(), 
deleteFiles.stream()).collect(Collectors.toList());
   }
@@ -101,6 +110,7 @@ public class FilesForCommit implements Serializable {
         .add("deleteFiles", deleteFiles.toString())
         .add("replacedDataFiles", replacedDataFiles.toString())
         .add("referencedDataFiles", referencedDataFiles.toString())
+        .add("mergedAndDeletedFiles", mergedAndDeletedFiles.toString())
         .toString();
   }
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
index 1ea78eeba54..98fd4b30163 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.TableName;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
@@ -56,6 +58,7 @@ import 
org.apache.iceberg.mr.mapred.AbstractMapredIcebergRecordReader;
 import org.apache.iceberg.mr.mapred.Container;
 import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
 import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
+import org.apache.iceberg.mr.mapreduce.IcebergMergeSplit;
 import org.apache.iceberg.mr.mapreduce.IcebergSplit;
 import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -64,7 +67,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
-    implements CombineHiveInputFormat.AvoidSplitCombination, 
VectorizedInputFormatInterface,
+    implements CombineHiveInputFormat.MergeSplits, 
VectorizedInputFormatInterface,
     LlapCacheOnlyInputFormatInterface.VectorizedOnly {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergInputFormat.class);
@@ -225,4 +228,11 @@ public class HiveIcebergInputFormat extends 
MapredIcebergInputFormat<Record>
     String dbAndTableName = TableName.fromString(tableName, null, 
null).getNotEmptyDbTable();
     return ICEBERG_DISABLE_VECTORIZATION_PREFIX + dbAndTableName;
   }
+
+  @Override
+  public FileSplit createMergeSplit(Configuration conf,
+                                    
CombineHiveInputFormat.CombineHiveInputSplit split,
+                                    Integer partition, Properties properties) 
throws IOException {
+    return new IcebergMergeSplit(conf, split, partition, properties);
+  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index d9f3116ff84..cb61d545b3d 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -52,12 +52,14 @@ import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.session.SessionStateUtil;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
 import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.DeleteFiles;
@@ -83,6 +85,7 @@ import 
org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.iceberg.util.Tasks;
@@ -97,6 +100,11 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
   private static final String FOR_COMMIT_EXTENSION = ".forCommit";
 
   private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergOutputCommitter.class);
+  private static final HiveIcebergOutputCommitter OUTPUT_COMMITTER = new 
HiveIcebergOutputCommitter();
+
+  public static HiveIcebergOutputCommitter getInstance() {
+    return OUTPUT_COMMITTER;
+  }
 
   @Override
   public void setupJob(JobContext jobContext) {
@@ -126,6 +134,7 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
 
     TaskAttemptID attemptID = context.getTaskAttemptID();
     JobConf jobConf = context.getJobConf();
+    Set<Path> mergedPaths = getCombinedLocations(jobConf);
     Set<String> outputs = 
HiveIcebergStorageHandler.outputTables(context.getJobConf());
     Map<String, List<HiveIcebergWriter>> writers = 
Optional.ofNullable(WriterRegistry.writers(attemptID))
         .orElseGet(() -> {
@@ -158,7 +167,8 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
                   replacedDataFiles.addAll(files.replacedDataFiles());
                   referencedDataFiles.addAll(files.referencedDataFiles());
                 }
-                createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, 
replacedDataFiles, referencedDataFiles),
+                createFileForCommit(new FilesForCommit(dataFiles, deleteFiles, 
replacedDataFiles, referencedDataFiles,
+                                mergedPaths),
                     fileForCommitLocation, table.io());
               } else {
                 LOG.info("CommitTask found no writer for specific table: {}, 
attemptID: {}", output, attemptID);
@@ -170,8 +180,6 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
               LOG.info("CommitTask found no serialized table in config for 
table: {}.", output);
             }
           }, IOException.class);
-
-      cleanMergeTaskInputFiles(jobConf, tableExecutor, context);
     } finally {
       if (tableExecutor != null) {
         tableExecutor.shutdown();
@@ -281,6 +289,9 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
                 .collect(Collectors.toList()));
             commitTable(table.io(), fileExecutor, output, operation);
           });
+
+      // Cleanup any merge input files.
+      cleanMergeTaskInputFiles(jobContextList, tableExecutor);
     } finally {
       fileExecutor.shutdown();
       if (tableExecutor != null) {
@@ -423,6 +434,7 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     List<DeleteFile> deleteFiles = Lists.newArrayList();
     List<DataFile> replacedDataFiles = Lists.newArrayList();
     Set<CharSequence> referencedDataFiles = Sets.newHashSet();
+    Set<Path> mergedAndDeletedFiles = Sets.newHashSet();
 
     Table table = null;
     String branchName = null;
@@ -459,9 +471,14 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
       deleteFiles.addAll(writeResults.deleteFiles());
       replacedDataFiles.addAll(writeResults.replacedDataFiles());
       referencedDataFiles.addAll(writeResults.referencedDataFiles());
+      mergedAndDeletedFiles.addAll(writeResults.mergedAndDeletedFiles());
     }
 
-    FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, 
replacedDataFiles, referencedDataFiles);
+    dataFiles.removeIf(dataFile -> mergedAndDeletedFiles.contains(new 
Path(String.valueOf(dataFile.path()))));
+    deleteFiles.removeIf(deleteFile -> mergedAndDeletedFiles.contains(new 
Path(String.valueOf(deleteFile.path()))));
+
+    FilesForCommit filesForCommit = new FilesForCommit(dataFiles, deleteFiles, 
replacedDataFiles, referencedDataFiles,
+            Collections.emptySet());
     long startTime = System.currentTimeMillis();
 
     if (Operation.IOW != operation) {
@@ -687,6 +704,7 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     Collection<DeleteFile> deleteFiles = new ConcurrentLinkedQueue<>();
     Collection<DataFile> replacedDataFiles = new ConcurrentLinkedQueue<>();
     Collection<CharSequence> referencedDataFiles = new 
ConcurrentLinkedQueue<>();
+    Collection<Path> mergedAndDeletedFiles = new ConcurrentLinkedQueue<>();
     Tasks.range(numTasks)
         .throwFailureWhenFinished(throwOnFailure)
         .executeWith(executor)
@@ -698,9 +716,10 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
           deleteFiles.addAll(files.deleteFiles());
           replacedDataFiles.addAll(files.replacedDataFiles());
           referencedDataFiles.addAll(files.referencedDataFiles());
+          mergedAndDeletedFiles.addAll(files.mergedAndDeletedFiles());
         });
 
-    return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, 
referencedDataFiles);
+    return new FilesForCommit(dataFiles, deleteFiles, replacedDataFiles, 
referencedDataFiles, mergedAndDeletedFiles);
   }
 
   /**
@@ -747,11 +766,18 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     }
   }
 
+  /**
+   * Generates a list of file statuses of the output files in the jobContexts.
+   * @param jobContexts List of jobContexts
+   * @return Returns the list of file statuses of the output files in the 
jobContexts
+   * @throws IOException Throws IOException
+   */
   public List<FileStatus> getOutputFiles(List<JobContext> jobContexts) throws 
IOException {
     List<OutputTable> outputs = collectOutputs(jobContexts);
     ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
     ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
-    Collection<FileStatus> dataFiles = new ConcurrentLinkedQueue<>();
+    Map<Path, List<FileStatus>> parentDirToDataFile = Maps.newConcurrentMap();
+    Map<Path, List<FileStatus>> parentDirToDeleteFile = 
Maps.newConcurrentMap();
     try {
       Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
                       .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext))))
@@ -773,8 +799,15 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
                 FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
                         table.io(), false);
                 for (DataFile dataFile : results.dataFiles()) {
-                  FileStatus fileStatus = fileSystem.getFileStatus(new 
Path(dataFile.path().toString()));
-                  dataFiles.add(fileStatus);
+                  Path filePath = new Path(dataFile.path().toString());
+                  FileStatus fileStatus = fileSystem.getFileStatus(filePath);
+                  parentDirToDataFile.computeIfAbsent(filePath.getParent(), k 
-> Lists.newArrayList()).add(fileStatus);
+                }
+                for (DeleteFile deleteFile : results.deleteFiles()) {
+                  Path filePath = new Path(deleteFile.path().toString());
+                  FileStatus fileStatus = fileSystem.getFileStatus(filePath);
+                  parentDirToDeleteFile.computeIfAbsent(filePath.getParent(),
+                    k -> Lists.newArrayList()).add(fileStatus);
                 }
               }, IOException.class);
     } finally {
@@ -783,28 +816,126 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
         tableExecutor.shutdown();
       }
     }
-    return Lists.newArrayList(dataFiles);
+    List<FileStatus> dataFiles = Lists.newArrayList();
+    dataFiles.addAll(parentDirToDataFile.values().stream()
+        .flatMap(List::stream).collect(Collectors.toList()));
+    dataFiles.addAll(parentDirToDeleteFile.values().stream()
+        .flatMap(List::stream).collect(Collectors.toList()));
+    return dataFiles;
   }
 
-  private void cleanMergeTaskInputFiles(JobConf jobConf,
-                                        ExecutorService tableExecutor,
-                                        TaskAttemptContext context) throws 
IOException {
+  /**
+   * Generates a list of ContentFile objects of the output files in the 
jobContexts.
+   * @param jobContexts List of jobContexts
+   * @return Returns the list of file statuses of the output files in the 
jobContexts
+   * @throws IOException Throws IOException
+   */
+  public List<ContentFile> getOutputContentFiles(List<JobContext> jobContexts) 
throws IOException {
+    List<OutputTable> outputs = collectOutputs(jobContexts);
+    ExecutorService fileExecutor = 
fileExecutor(jobContexts.get(0).getJobConf());
+    ExecutorService tableExecutor = 
tableExecutor(jobContexts.get(0).getJobConf(), outputs.size());
+    Collection<ContentFile> files = new ConcurrentLinkedQueue<>();
+    try {
+      Tasks.foreach(outputs.stream().flatMap(kv -> kv.jobContexts.stream()
+                      .map(jobContext -> new SimpleImmutableEntry<>(kv.table, 
jobContext))))
+              .suppressFailureWhenFinished()
+              .executeWith(tableExecutor)
+              .onFailure((output, exc) -> LOG.warn("Failed to retrieve merge 
input file for the table {}", output, exc))
+              .run(output -> {
+                JobContext jobContext = output.getValue();
+                JobConf jobConf = jobContext.getJobConf();
+                LOG.info("Cleaning job for jobID: {}, table: {}", 
jobContext.getJobID(), output);
+
+                Table table = output.getKey();
+                FileSystem fileSystem = new 
Path(table.location()).getFileSystem(jobConf);
+                String jobLocation = generateJobLocation(table.location(), 
jobConf, jobContext.getJobID());
+                // list jobLocation to get number of forCommit files
+                // we do this because map/reduce num in jobConf is unreliable
+                // and we have no access to vertex status info
+                int numTasks = listForCommits(jobConf, jobLocation).size();
+                FilesForCommit results = collectResults(numTasks, 
fileExecutor, table.location(), jobContext,
+                        table.io(), false);
+                files.addAll(results.dataFiles());
+                files.addAll(results.deleteFiles());
+              }, IOException.class);
+    } finally {
+      fileExecutor.shutdown();
+      if (tableExecutor != null) {
+        tableExecutor.shutdown();
+      }
+    }
+    return Lists.newArrayList(files);
+  }
+
+  private void cleanMergeTaskInputFiles(List<JobContext> jobContexts,
+                                        ExecutorService tableExecutor) throws 
IOException {
     // Merge task has merged several files into one. Hence we need to remove 
the stale files.
     // At this stage the file is written and task-committed, but the old files 
are still present.
+    for (JobContext jobContext : jobContexts) {
+      JobConf jobConf = jobContext.getJobConf();
+      if 
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
 {
+        MapWork mrwork = Utilities.getMapWork(jobConf);
+        if (mrwork != null) {
+          List<Path> mergedPaths = mrwork.getInputPaths();
+          if (mergedPaths != null) {
+            Tasks.foreach(mergedPaths)
+                    .retry(3)
+                    .executeWith(tableExecutor)
+                    .run(path -> {
+                      FileSystem fs = path.getFileSystem(jobConf);
+                      if (fs.exists(path)) {
+                        fs.delete(path, true);
+                      }
+                    }, IOException.class);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Generates {@link JobContext}s for the OutputCommitter for the specific 
table.
+   * @param configuration The configuration used for as a base of the JobConf
+   * @param tableName The name of the table we are planning to commit
+   * @param branchName the name of the branch
+   * @return The generated Optional JobContext list or empty if not presents.
+   */
+  static List<JobContext> generateJobContext(Configuration configuration, 
String tableName,
+                                             String branchName) {
+    JobConf jobConf = new JobConf(configuration);
+    Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
+            SessionStateUtil.getCommitInfo(jobConf, tableName);
+    if (commitInfoMap.isPresent()) {
+      List<JobContext> jobContextList = Lists.newLinkedList();
+      for (SessionStateUtil.CommitInfo commitInfo : 
commitInfoMap.get().values()) {
+        org.apache.hadoop.mapred.JobID jobID = 
org.apache.hadoop.mapred.JobID.forName(commitInfo.getJobIdStr());
+        commitInfo.getProps().forEach(jobConf::set);
+
+        // we should only commit this current table because
+        // for multi-table inserts, this hook method will be called 
sequentially for each target table
+        jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
+        if (branchName != null) {
+          jobConf.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, branchName);
+        }
+
+        jobContextList.add(new JobContextImpl(jobConf, jobID, null));
+      }
+      return jobContextList;
+    } else {
+      // most likely empty write scenario
+      LOG.debug("Unable to find commit information in query state for table: 
{}", tableName);
+      return Collections.emptyList();
+    }
+  }
+
+  private Set<Path> getCombinedLocations(JobConf jobConf) {
+    Set<Path> mergedPaths = Sets.newHashSet();
     if 
(jobConf.getInputFormat().getClass().isAssignableFrom(CombineHiveInputFormat.class))
 {
       MapWork mrwork = Utilities.getMapWork(jobConf);
-      if (mrwork != null) {
-        List<Path> mergedPaths = mrwork.getInputPaths();
-        if (mergedPaths != null) {
-          Tasks.foreach(mergedPaths)
-                  .retry(3)
-                  .executeWith(tableExecutor)
-                  .run(path -> {
-                    FileSystem fs = path.getFileSystem(context.getJobConf());
-                    fs.delete(path, true);
-                  }, IOException.class);
-        }
+      if (mrwork != null && mrwork.getInputPaths() != null) {
+        mergedPaths.addAll(mrwork.getInputPaths());
       }
     }
+    return mergedPaths;
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
index 3157fbb0411..7ccb4bfb3bf 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergStorageHandler.java
@@ -132,8 +132,6 @@ import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.JobContextImpl;
-import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
@@ -763,7 +761,8 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     if (location != null) {
       HiveTableUtil.cleanupTableObjectFile(location, configuration);
     }
-    List<JobContext> jobContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+    List<JobContext> jobContextList = HiveIcebergOutputCommitter
+            .generateJobContext(configuration, tableName, snapshotRef);
     if (jobContextList.isEmpty()) {
       return;
     }
@@ -1569,41 +1568,6 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     return true;
   }
 
-  /**
-   * Generates {@link JobContext}s for the OutputCommitter for the specific 
table.
-   * @param configuration The configuration used for as a base of the JobConf
-   * @param tableName The name of the table we are planning to commit
-   * @param branchName the name of the branch
-   * @return The generated Optional JobContext list or empty if not presents.
-   */
-  private List<JobContext> generateJobContext(Configuration configuration, 
String tableName,
-      String branchName) {
-    JobConf jobConf = new JobConf(configuration);
-    Optional<Map<String, SessionStateUtil.CommitInfo>> commitInfoMap =
-        SessionStateUtil.getCommitInfo(jobConf, tableName);
-    if (commitInfoMap.isPresent()) {
-      List<JobContext> jobContextList = Lists.newLinkedList();
-      for (SessionStateUtil.CommitInfo commitInfo : 
commitInfoMap.get().values()) {
-        JobID jobID = JobID.forName(commitInfo.getJobIdStr());
-        commitInfo.getProps().forEach(jobConf::set);
-
-        // we should only commit this current table because
-        // for multi-table inserts, this hook method will be called 
sequentially for each target table
-        jobConf.set(InputFormatConfig.OUTPUT_TABLES, tableName);
-        if (branchName != null) {
-          jobConf.set(InputFormatConfig.OUTPUT_TABLE_SNAPSHOT_REF, branchName);
-        }
-
-        jobContextList.add(new JobContextImpl(jobConf, jobID, null));
-      }
-      return jobContextList;
-    } else {
-      // most likely empty write scenario
-      LOG.debug("Unable to find commit information in query state for table: 
{}", tableName);
-      return Collections.emptyList();
-    }
-  }
-
   private String getOperationType() {
     return SessionStateUtil.getProperty(conf, Operation.class.getSimpleName())
         .orElse(Operation.OTHER.name());
@@ -2172,7 +2136,8 @@ public class HiveIcebergStorageHandler implements 
HiveStoragePredicateHandler, H
     String tableName = properties.getProperty(Catalogs.NAME);
     String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
     Configuration configuration = SessionState.getSessionConf();
-    List<JobContext> originalContextList = generateJobContext(configuration, 
tableName, snapshotRef);
+    List<JobContext> originalContextList = HiveIcebergOutputCommitter
+            .generateJobContext(configuration, tableName, snapshotRef);
     List<JobContext> jobContextList = originalContextList.stream()
             .map(TezUtil::enrichContextWithVertexId)
             .collect(Collectors.toList());
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
index ff47a801a5b..e3e6df6ceba 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergMergeTaskProperties.java
@@ -19,19 +19,20 @@
 package org.apache.iceberg.mr.hive;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Properties;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
-import org.apache.hadoop.hive.ql.io.StorageFormatFactory;
 import org.apache.hadoop.hive.ql.plan.MergeTaskProperties;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.TableProperties;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.mr.Catalogs;
 
 public class IcebergMergeTaskProperties implements MergeTaskProperties {
 
   private final Properties properties;
-  private static final StorageFormatFactory storageFormatFactory = new 
StorageFormatFactory();
 
   IcebergMergeTaskProperties(Properties properties) {
     this.properties = properties;
@@ -42,14 +43,23 @@ public class IcebergMergeTaskProperties implements 
MergeTaskProperties {
     return new Path(location + "/data/");
   }
 
-  public StorageFormatDescriptor getStorageFormatDescriptor() throws 
IOException {
-    FileFormat fileFormat = 
FileFormat.fromString(properties.getProperty(TableProperties.DEFAULT_FILE_FORMAT,
-            TableProperties.DEFAULT_FILE_FORMAT_DEFAULT));
-    StorageFormatDescriptor descriptor = 
storageFormatFactory.get(fileFormat.name());
-    if (descriptor == null) {
-      throw new IOException("Unsupported storage format descriptor");
+  @Override
+  public Properties getSplitProperties() throws IOException {
+    String tableName = properties.getProperty(Catalogs.NAME);
+    String snapshotRef = properties.getProperty(Catalogs.SNAPSHOT_REF);
+    Configuration configuration = SessionState.getSessionConf();
+    List<JobContext> originalContextList = HiveIcebergOutputCommitter
+        .generateJobContext(configuration, tableName, snapshotRef);
+    List<JobContext> jobContextList = originalContextList.stream()
+            .map(TezUtil::enrichContextWithVertexId)
+            .collect(Collectors.toList());
+    if (jobContextList.isEmpty()) {
+      return null;
     }
-    return descriptor;
+    List<ContentFile> contentFiles = 
HiveIcebergOutputCommitter.getInstance().getOutputContentFiles(jobContextList);
+    Properties pathToContentFile = new Properties();
+    contentFiles.forEach(contentFile ->
+        pathToContentFile.put(new Path(String.valueOf(contentFile.path())), 
contentFile));
+    return pathToContentFile;
   }
-
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
index 79454f45ee8..653372d7988 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java
@@ -344,4 +344,5 @@ public class IcebergTableUtil {
     }
     return data;
   }
+
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
index 92e9c4a688d..2e892900d02 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/AbstractMapredIcebergRecordReader.java
@@ -23,8 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.iceberg.mr.mapreduce.IcebergSplit;
 
 @SuppressWarnings("checkstyle:VisibilityModifier")
 public abstract class AbstractMapredIcebergRecordReader<T> implements 
RecordReader<Void, T> {
@@ -32,7 +32,7 @@ public abstract class AbstractMapredIcebergRecordReader<T> 
implements RecordRead
   protected final org.apache.hadoop.mapreduce.RecordReader<Void, ?> 
innerReader;
 
   public 
AbstractMapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<?>
 mapreduceInputFormat,
-      IcebergSplit split, JobConf job, Reporter reporter) throws IOException {
+      InputSplit split, JobConf job, Reporter reporter) throws IOException {
     TaskAttemptContext context = 
MapredIcebergInputFormat.newTaskAttemptContext(job, reporter);
 
     try {
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
index 822ce52a7e4..473e8fe1486 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
@@ -37,7 +37,7 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.mr.InputFormatConfig;
-import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergMergeSplit;
 import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
 
 /**
@@ -75,8 +75,13 @@ public class MapredIcebergInputFormat<T> implements 
InputFormat<Void, Container<
   @Override
   public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, 
JobConf job,
                                                           Reporter reporter) 
throws IOException {
-    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
-    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, 
job, reporter);
+    try {
+      org.apache.hadoop.mapreduce.InputSplit inputSplit = split instanceof 
IcebergMergeSplit ?
+              (IcebergMergeSplit) split : ((IcebergSplitContainer) 
split).icebergSplit();
+      return new MapredIcebergRecordReader<>(innerInputFormat, inputSplit, 
job, reporter);
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
 
@@ -85,7 +90,8 @@ public class MapredIcebergInputFormat<T> implements 
InputFormat<Void, Container<
     private final long splitLength; // for getPos()
 
     
MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> 
mapreduceInputFormat,
-                              IcebergSplit split, JobConf job, Reporter 
reporter) throws IOException {
+        org.apache.hadoop.mapreduce.InputSplit split, JobConf job, Reporter 
reporter)
+        throws IOException, InterruptedException {
       super(mapreduceInputFormat, split, job, reporter);
       splitLength = split.getLength();
     }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java
new file mode 100644
index 00000000000..53ee0d870f8
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/AbstractIcebergRecordReader.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.InternalRecordWrapper;
+import org.apache.iceberg.expressions.Evaluator;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
+
+public abstract class AbstractIcebergRecordReader<T> extends 
RecordReader<Void, T> {
+
+  private TaskAttemptContext context;
+  private Configuration conf;
+  private Table table;
+  private Schema expectedSchema;
+  private String nameMapping;
+  private boolean reuseContainers;
+  private boolean caseSensitive;
+  private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
+  private boolean fetchVirtualColumns;
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext newContext) {
+    // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
+    this.context = newContext;
+    this.conf = newContext.getConfiguration();
+    this.table = HiveIcebergStorageHandler.table(conf, 
conf.get(InputFormatConfig.TABLE_IDENTIFIER));
+    HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table);
+    this.nameMapping = 
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
+    this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, 
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
+    this.expectedSchema = readSchema(conf, table, caseSensitive);
+    this.reuseContainers = conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, 
false);
+    this.inMemoryDataModel = 
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
+      InputFormatConfig.InMemoryDataModel.GENERIC);
+    this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
+  }
+
+  private static Schema readSchema(Configuration conf, Table table, boolean 
caseSensitive) {
+    Schema readSchema = InputFormatConfig.readSchema(conf);
+
+    if (readSchema != null) {
+      return readSchema;
+    }
+
+    String[] selectedColumns = InputFormatConfig.selectedColumns(conf);
+    readSchema = table.schema();
+
+    if (selectedColumns != null) {
+      readSchema =
+        caseSensitive ? readSchema.select(selectedColumns) : 
readSchema.caseInsensitiveSelect(selectedColumns);
+    }
+
+    if (InputFormatConfig.fetchVirtualColumns(conf)) {
+      return 
IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), 
table);
+    }
+
+    return readSchema;
+  }
+
+  CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> iter, 
Expression residual,
+                                              Schema readSchema) {
+    boolean applyResidual = !getContext().getConfiguration()
+        .getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
+
+    if (applyResidual && residual != null && residual != 
Expressions.alwaysTrue()) {
+      // Date and timestamp values are not the correct type for Evaluator.
+      // Wrapping to return the expected type.
+      InternalRecordWrapper wrapper = new 
InternalRecordWrapper(readSchema.asStruct());
+      Evaluator filter = new Evaluator(readSchema.asStruct(), residual, 
caseSensitive);
+      return CloseableIterable.filter(iter, record -> 
filter.eval(wrapper.wrap((StructLike) record)));
+    } else {
+      return iter;
+    }
+  }
+
+  public TaskAttemptContext getContext() {
+    return context;
+  }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public boolean isReuseContainers() {
+    return reuseContainers;
+  }
+
+  public Schema getExpectedSchema() {
+    return expectedSchema;
+  }
+
+  public String getNameMapping() {
+    return nameMapping;
+  }
+
+  public Table getTable() {
+    return table;
+  }
+
+  public InputFormatConfig.InMemoryDataModel getInMemoryDataModel() {
+    return inMemoryDataModel;
+  }
+
+  public boolean isFetchVirtualColumns() {
+    return fetchVirtualColumns;
+  }
+
+  public boolean isCaseSensitive() {
+    return caseSensitive;
+  }
+
+  @Override
+  public Void getCurrentKey() {
+    return null;
+  }
+
+  @Override
+  public float getProgress() {
+    // TODO: We could give a more accurate progress based on records read from 
the file. Context.getProgress does not
+    // have enough information to give an accurate progress value. This isn't 
that easy, since we don't know how much
+    // of the input split has been processed and we are pushing filters into 
Parquet and ORC. But we do know when a
+    // file is opened and could count the number of rows returned, so we can 
estimate. And we could also add a row
+    // count to the readers so that we can get an accurate count of rows that 
have been either returned or filtered
+    // out.
+    return getContext().getProgress();
+  }
+}
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index efb3988688d..00e63740e28 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -21,23 +21,15 @@ package org.apache.iceberg.mr.mapreduce;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
-import java.util.Collections;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
-import java.util.function.BiFunction;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.LlapHiveUtils;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -49,54 +41,23 @@ import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataTableScan;
-import org.apache.iceberg.DataTask;
-import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.IncrementalAppendScan;
-import org.apache.iceberg.MetadataColumns;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.Scan;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SnapshotRef;
-import org.apache.iceberg.StructLike;
 import org.apache.iceberg.SystemConfigs;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.common.DynMethods;
-import org.apache.iceberg.data.DeleteFilter;
-import org.apache.iceberg.data.GenericDeleteFilter;
-import org.apache.iceberg.data.IdentityPartitionConverters;
-import org.apache.iceberg.data.InternalRecordWrapper;
-import org.apache.iceberg.data.avro.DataReader;
-import org.apache.iceberg.data.orc.GenericOrcReader;
-import org.apache.iceberg.data.parquet.GenericParquetReaders;
-import org.apache.iceberg.encryption.EncryptedFiles;
-import org.apache.iceberg.expressions.Evaluator;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hive.HiveVersion;
 import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.io.InputFile;
-import org.apache.iceberg.mapping.NameMappingParser;
 import org.apache.iceberg.mr.Catalogs;
 import org.apache.iceberg.mr.InputFormatConfig;
-import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
 import org.apache.iceberg.mr.hive.HiveIcebergStorageHandler;
-import org.apache.iceberg.mr.hive.IcebergAcidUtil;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
-import org.apache.iceberg.types.Type;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.apache.iceberg.util.PartitionUtil;
 import org.apache.iceberg.util.SerializationUtil;
 import org.apache.iceberg.util.ThreadPools;
 
@@ -293,319 +254,7 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
 
   @Override
   public RecordReader<Void, T> createRecordReader(InputSplit split, 
TaskAttemptContext context) {
-    return new IcebergRecordReader<>();
+    return split instanceof IcebergMergeSplit ? new 
IcebergMergeRecordReader<>() : new IcebergRecordReader<>();
   }
 
-  private static final class IcebergRecordReader<T> extends RecordReader<Void, 
T> {
-
-    private static final String HIVE_VECTORIZED_READER_CLASS = 
"org.apache.iceberg.mr.hive.vector.HiveVectorizedReader";
-    private static final DynMethods.StaticMethod 
HIVE_VECTORIZED_READER_BUILDER;
-
-    static {
-      if (HiveVersion.min(HiveVersion.HIVE_3)) {
-        HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
-            .impl(HIVE_VECTORIZED_READER_CLASS,
-                Table.class,
-                Path.class,
-                FileScanTask.class,
-                Map.class,
-                TaskAttemptContext.class,
-                Expression.class,
-                Schema.class)
-            .buildStatic();
-      } else {
-        HIVE_VECTORIZED_READER_BUILDER = null;
-      }
-    }
-
-    private TaskAttemptContext context;
-    private Configuration conf;
-    private Schema expectedSchema;
-    private String nameMapping;
-    private boolean reuseContainers;
-    private boolean caseSensitive;
-    private InputFormatConfig.InMemoryDataModel inMemoryDataModel;
-    private Iterator<FileScanTask> tasks;
-    private T current;
-    private CloseableIterator<T> currentIterator;
-    private Table table;
-    private boolean fetchVirtualColumns;
-
-    @Override
-    public void initialize(InputSplit split, TaskAttemptContext newContext) {
-      // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
-      CombinedScanTask task = ((IcebergSplit) split).task();
-      this.context = newContext;
-      this.conf = newContext.getConfiguration();
-      this.table = SerializationUtil.deserializeFromBase64(
-                conf.get(InputFormatConfig.SERIALIZED_TABLE_PREFIX + 
conf.get(InputFormatConfig.TABLE_IDENTIFIER)));
-      HiveIcebergStorageHandler.checkAndSetIoConfig(conf, table);
-      this.tasks = task.files().iterator();
-      this.nameMapping = 
table.properties().get(TableProperties.DEFAULT_NAME_MAPPING);
-      this.caseSensitive = conf.getBoolean(InputFormatConfig.CASE_SENSITIVE, 
InputFormatConfig.CASE_SENSITIVE_DEFAULT);
-      this.expectedSchema = readSchema(conf, table, caseSensitive);
-      this.reuseContainers = 
conf.getBoolean(InputFormatConfig.REUSE_CONTAINERS, false);
-      this.inMemoryDataModel = 
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
-              InputFormatConfig.InMemoryDataModel.GENERIC);
-      this.fetchVirtualColumns = InputFormatConfig.fetchVirtualColumns(conf);
-      this.currentIterator = nextTask();
-    }
-
-    private CloseableIterator<T> nextTask() {
-      CloseableIterator<T> closeableIterator = open(tasks.next(), 
expectedSchema).iterator();
-      if (!fetchVirtualColumns || Utilities.getIsVectorized(conf)) {
-        return closeableIterator;
-      }
-      return new 
IcebergAcidUtil.VirtualColumnAwareIterator<T>(closeableIterator, 
expectedSchema, conf);
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException {
-      while (true) {
-        if (currentIterator.hasNext()) {
-          current = currentIterator.next();
-          return true;
-        } else if (tasks.hasNext()) {
-          currentIterator.close();
-          this.currentIterator = nextTask();
-        } else {
-          currentIterator.close();
-          return false;
-        }
-      }
-    }
-
-    @Override
-    public Void getCurrentKey() {
-      return null;
-    }
-
-    @Override
-    public T getCurrentValue() {
-      return current;
-    }
-
-    @Override
-    public float getProgress() {
-      // TODO: We could give a more accurate progress based on records read 
from the file. Context.getProgress does not
-      // have enough information to give an accurate progress value. This 
isn't that easy, since we don't know how much
-      // of the input split has been processed and we are pushing filters into 
Parquet and ORC. But we do know when a
-      // file is opened and could count the number of rows returned, so we can 
estimate. And we could also add a row
-      // count to the readers so that we can get an accurate count of rows 
that have been either returned or filtered
-      // out.
-      return context.getProgress();
-    }
-
-    @Override
-    public void close() throws IOException {
-      currentIterator.close();
-    }
-
-    private CloseableIterable<T> openVectorized(FileScanTask task, Schema 
readSchema) {
-      
Preconditions.checkArgument(!task.file().format().equals(FileFormat.AVRO),
-          "Vectorized execution is not yet supported for Iceberg avro tables. 
" +
-              "Please turn off vectorization and retry the query.");
-      Preconditions.checkArgument(HiveVersion.min(HiveVersion.HIVE_3),
-          "Vectorized read is unsupported for Hive 2 integration.");
-
-      Path path = new Path(task.file().path().toString());
-      Map<Integer, ?> idToConstant = constantsMap(task, 
HiveIdentityPartitionConverters::convertConstant);
-      Expression residual = HiveIcebergInputFormat.residualForTask(task, 
context.getConfiguration());
-
-      // TODO: We have to take care of the EncryptionManager when LLAP and 
vectorization is used
-      CloseableIterable<T> iterator = 
HIVE_VECTORIZED_READER_BUILDER.invoke(table, path, task,
-          idToConstant, context, residual, readSchema);
-
-      return applyResidualFiltering(iterator, residual, readSchema);
-    }
-
-    private CloseableIterable<T> openGeneric(FileScanTask task, Schema 
readSchema) {
-      if (task.isDataTask()) {
-        // When querying metadata tables, the currentTask is a DataTask and 
the data has to
-        // be fetched from the task instead of reading it from files.
-        IcebergInternalRecordWrapper wrapper =
-            new IcebergInternalRecordWrapper(table.schema().asStruct(), 
readSchema.asStruct());
-        return (CloseableIterable) CloseableIterable.transform(((DataTask) 
task).rows(), row -> wrapper.wrap(row));
-      }
-
-      DataFile file = task.file();
-      InputFile inputFile = 
table.encryption().decrypt(EncryptedFiles.encryptedInput(
-          table.io().newInputFile(file.path().toString()),
-          file.keyMetadata()));
-
-      CloseableIterable<T> iterable;
-      switch (file.format()) {
-        case AVRO:
-          iterable = newAvroIterable(inputFile, task, readSchema);
-          break;
-        case ORC:
-          iterable = newOrcIterable(inputFile, task, readSchema);
-          break;
-        case PARQUET:
-          iterable = newParquetIterable(inputFile, task, readSchema);
-          break;
-        default:
-          throw new UnsupportedOperationException(
-              String.format("Cannot read %s file: %s", file.format().name(), 
file.path()));
-      }
-
-      return iterable;
-    }
-
-    @SuppressWarnings("unchecked")
-    private CloseableIterable<T> open(FileScanTask currentTask, Schema 
readSchema) {
-      switch (inMemoryDataModel) {
-        case PIG:
-          // TODO: Support Pig and Hive object models for IcebergInputFormat
-          throw new UnsupportedOperationException("Pig and Hive object models 
are not supported.");
-        case HIVE:
-          return openVectorized(currentTask, readSchema);
-        case GENERIC:
-          DeleteFilter deletes = new GenericDeleteFilter(table.io(), 
currentTask, table.schema(), readSchema);
-          Schema requiredSchema = deletes.requiredSchema();
-          return deletes.filter(openGeneric(currentTask, requiredSchema));
-        default:
-          throw new UnsupportedOperationException("Unsupported memory model");
-      }
-    }
-
-    private CloseableIterable<T> applyResidualFiltering(CloseableIterable<T> 
iter, Expression residual,
-                                                        Schema readSchema) {
-      boolean applyResidual = 
!context.getConfiguration().getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING,
 false);
-
-      if (applyResidual && residual != null && residual != 
Expressions.alwaysTrue()) {
-        // Date and timestamp values are not the correct type for Evaluator.
-        // Wrapping to return the expected type.
-        InternalRecordWrapper wrapper = new 
InternalRecordWrapper(readSchema.asStruct());
-        Evaluator filter = new Evaluator(readSchema.asStruct(), residual, 
caseSensitive);
-        return CloseableIterable.filter(iter, record -> 
filter.eval(wrapper.wrap((StructLike) record)));
-      } else {
-        return iter;
-      }
-    }
-
-    private CloseableIterable<T> newAvroIterable(
-        InputFile inputFile, FileScanTask task, Schema readSchema) {
-      Expression residual = HiveIcebergInputFormat.residualForTask(task, 
context.getConfiguration());
-      Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
-          .project(readSchema)
-          .split(task.start(), task.length());
-
-      if (reuseContainers) {
-        avroReadBuilder.reuseContainers();
-      }
-
-      if (nameMapping != null) {
-        
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-      }
-
-      avroReadBuilder.createReaderFunc(
-          (expIcebergSchema, expAvroSchema) ->
-              DataReader.create(expIcebergSchema, expAvroSchema,
-                  constantsMap(task, 
IdentityPartitionConverters::convertConstant)));
-
-      return applyResidualFiltering(avroReadBuilder.build(), residual, 
readSchema);
-    }
-
-    private CloseableIterable<T> newParquetIterable(InputFile inputFile, 
FileScanTask task, Schema readSchema) {
-      Expression residual = HiveIcebergInputFormat.residualForTask(task, 
context.getConfiguration());
-
-      Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
-          .project(readSchema)
-          .filter(residual)
-          .caseSensitive(caseSensitive)
-          .split(task.start(), task.length());
-
-      if (reuseContainers) {
-        parquetReadBuilder.reuseContainers();
-      }
-
-      if (nameMapping != null) {
-        
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-      }
-
-      parquetReadBuilder.createReaderFunc(
-          fileSchema -> GenericParquetReaders.buildReader(
-              readSchema, fileSchema, constantsMap(task, 
IdentityPartitionConverters::convertConstant)));
-
-      return applyResidualFiltering(parquetReadBuilder.build(), residual, 
readSchema);
-    }
-
-    private CloseableIterable<T> newOrcIterable(InputFile inputFile, 
FileScanTask task, Schema readSchema) {
-      Map<Integer, ?> idToConstant = constantsMap(task, 
IdentityPartitionConverters::convertConstant);
-      Schema readSchemaWithoutConstantAndMetadataFields = 
schemaWithoutConstantsAndMeta(readSchema, idToConstant);
-      Expression residual = HiveIcebergInputFormat.residualForTask(task, 
context.getConfiguration());
-
-      ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
-          .project(readSchemaWithoutConstantAndMetadataFields)
-          .filter(residual)
-          .caseSensitive(caseSensitive)
-          .split(task.start(), task.length());
-
-      if (nameMapping != null) {
-        
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(nameMapping));
-      }
-
-      orcReadBuilder.createReaderFunc(
-          fileSchema -> GenericOrcReader.buildReader(
-              readSchema, fileSchema, idToConstant));
-
-      return applyResidualFiltering(orcReadBuilder.build(), residual, 
readSchema);
-    }
-
-    private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, 
Object, Object> converter) {
-      PartitionSpec spec = task.spec();
-      Set<Integer> idColumns = spec.identitySourceIds();
-      Schema partitionSchema = TypeUtil.select(expectedSchema, idColumns);
-      boolean projectsIdentityPartitionColumns = 
!partitionSchema.columns().isEmpty();
-      if (expectedSchema.findField(MetadataColumns.PARTITION_COLUMN_ID) != 
null) {
-        Types.StructType partitionType = Partitioning.partitionType(table);
-        return PartitionUtil.constantsMap(task, partitionType, converter);
-      } else if (projectsIdentityPartitionColumns) {
-        Types.StructType partitionType = Partitioning.partitionType(table);
-        return PartitionUtil.constantsMap(task, partitionType, converter);
-      } else {
-        return Collections.emptyMap();
-      }
-    }
-
-    private static Schema readSchema(Configuration conf, Table table, boolean 
caseSensitive) {
-      Schema readSchema = InputFormatConfig.readSchema(conf);
-
-      if (readSchema != null) {
-        return readSchema;
-      }
-
-      String[] selectedColumns = InputFormatConfig.selectedColumns(conf);
-      readSchema = table.schema();
-
-      if (selectedColumns != null) {
-        readSchema =
-            caseSensitive ? readSchema.select(selectedColumns) : 
readSchema.caseInsensitiveSelect(selectedColumns);
-      }
-
-      if (InputFormatConfig.fetchVirtualColumns(conf)) {
-        return 
IcebergAcidUtil.createFileReadSchemaWithVirtualColums(readSchema.columns(), 
table);
-      }
-
-      return readSchema;
-    }
-
-    private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, 
Map<Integer, ?> idToConstant) {
-      // remove the nested fields of the partition struct
-      Set<Integer> partitionFields = 
Optional.ofNullable(readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID))
-          .map(Types.NestedField::type)
-          .map(Type::asStructType)
-          .map(Types.StructType::fields)
-          .map(fields -> 
fields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet()))
-          .orElseGet(Collections::emptySet);
-
-      // remove constants and meta columns too
-      Set<Integer> collect = Stream.of(idToConstant.keySet(), 
MetadataColumns.metadataFieldIds(), partitionFields)
-          .flatMap(Set::stream)
-          .collect(Collectors.toSet());
-
-      return TypeUtil.selectNot(readSchema, collect);
-    }
-  }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java
new file mode 100644
index 00000000000..60995a1988d
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeRecordReader.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import java.io.IOException;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+
+public final class IcebergMergeRecordReader<T> extends 
AbstractIcebergRecordReader<T> {
+  private IcebergMergeSplit mergeSplit;
+  private CloseableIterator<T> currentIterator;
+  private T current;
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext newContext) {
+    // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
+    super.initialize(split, newContext);
+    mergeSplit = (IcebergMergeSplit) split;
+    this.currentIterator = nextTask();
+  }
+
+  private CloseableIterator<T> nextTask() {
+    return openGeneric(mergeSplit.getContentFile(), 
getTable().schema()).iterator();
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    while (true) {
+      if (currentIterator.hasNext()) {
+        current = currentIterator.next();
+        return true;
+      } else {
+        currentIterator.close();
+        return false;
+      }
+    }
+  }
+
+  @Override
+  public T getCurrentValue() {
+    return current;
+  }
+
+  @Override
+  public void close() throws IOException {
+    currentIterator.close();
+  }
+
+  private CloseableIterable<T> openGeneric(ContentFile contentFile, Schema 
readSchema) {
+    InputFile inputFile = null;
+    Schema schema = null;
+    if (contentFile instanceof DataFile) {
+      DataFile dataFile = (DataFile) contentFile;
+      inputFile = 
getTable().encryption().decrypt(EncryptedFiles.encryptedInput(
+              getTable().io().newInputFile(dataFile.path().toString()),
+              dataFile.keyMetadata()));
+      schema = readSchema;
+    }
+    CloseableIterable<T> iterable;
+    switch (contentFile.format()) {
+      case AVRO:
+        iterable = newAvroIterable(inputFile, schema);
+        break;
+      case ORC:
+        iterable = newOrcIterable(inputFile, schema);
+        break;
+      case PARQUET:
+        iterable = newParquetIterable(inputFile, schema);
+        break;
+      default:
+        throw new UnsupportedOperationException(
+          String.format("Cannot read %s file: %s", 
contentFile.format().name(), contentFile.path()));
+    }
+
+    return iterable;
+  }
+
+  private CloseableIterable<T> newAvroIterable(
+          InputFile inputFile, Schema readSchema) {
+    Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
+        .project(readSchema)
+        .split(mergeSplit.getStart(), mergeSplit.getLength());
+
+    if (isReuseContainers()) {
+      avroReadBuilder.reuseContainers();
+    }
+
+    if (getNameMapping() != null) {
+      
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+    }
+
+    avroReadBuilder.createReaderFunc(
+        (expIcebergSchema, expAvroSchema) ->
+             DataReader.create(expIcebergSchema, expAvroSchema, 
Maps.newHashMap()));
+
+    return applyResidualFiltering(avroReadBuilder.build(), null, readSchema);
+  }
+
+  private CloseableIterable<T> newOrcIterable(InputFile inputFile, Schema 
readSchema) {
+    ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+        .project(readSchema)
+        .caseSensitive(isCaseSensitive())
+        .split(mergeSplit.getStart(), mergeSplit.getLength());
+
+    if (getNameMapping() != null) {
+      
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+    }
+
+    orcReadBuilder.createReaderFunc(
+        fileSchema -> GenericOrcReader.buildReader(
+            readSchema, fileSchema, Maps.newHashMap()));
+
+    return applyResidualFiltering(orcReadBuilder.build(), null, readSchema);
+  }
+
+  private CloseableIterable<T> newParquetIterable(InputFile inputFile, Schema 
readSchema) {
+
+    Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
+        .project(readSchema)
+        .caseSensitive(isCaseSensitive())
+        .split(mergeSplit.getStart(), mergeSplit.getLength());
+
+    if (isReuseContainers()) {
+      parquetReadBuilder.reuseContainers();
+    }
+
+    if (getNameMapping() != null) {
+      
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+    }
+
+    parquetReadBuilder.createReaderFunc(
+        fileSchema -> GenericParquetReaders.buildReader(
+            readSchema, fileSchema, Maps.newHashMap()));
+
+    return applyResidualFiltering(parquetReadBuilder.build(), null, 
readSchema);
+  }
+}
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java
similarity index 50%
copy from 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
copy to 
iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java
index 420bf9ea3b1..64a4b6737be 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergMergeSplit.java
@@ -22,65 +22,46 @@ package org.apache.iceberg.mr.mapreduce;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.iceberg.CombinedScanTask;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.ContentFile;
 import org.apache.iceberg.util.SerializationUtil;
 
-// Since this class extends `mapreduce.InputSplit and implements 
`mapred.InputSplit`, it can be returned by both MR v1
-// and v2 file formats.
-public class IcebergSplit extends InputSplit implements 
org.apache.hadoop.mapred.InputSplit, IcebergSplitContainer {
-
-  public static final String[] ANYWHERE = new String[]{"*"};
-
-  private CombinedScanTask task;
-
-  private transient String[] locations;
+public class IcebergMergeSplit extends FileSplit implements 
org.apache.hadoop.mapred.InputSplit {
   private transient Configuration conf;
+  private ContentFile contentFile;
 
   // public no-argument constructor for deserialization
-  public IcebergSplit() {
+  public IcebergMergeSplit() {
   }
 
-  IcebergSplit(Configuration conf, CombinedScanTask task) {
-    this.task = task;
+  public IcebergMergeSplit(Configuration conf,
+                           CombineHiveInputFormat.CombineHiveInputSplit split,
+                           Integer partition, Properties properties) throws 
IOException {
+    super(split.getPaths()[partition], split
+            .getStartOffsets()[partition], split.getLengths()[partition], split
+            .getLocations());
     this.conf = conf;
-  }
-
-  public CombinedScanTask task() {
-    return task;
-  }
-
-  @Override
-  public IcebergSplit icebergSplit() {
-    return this;
+    Path path = split.getPaths()[partition];
+    contentFile = (ContentFile) properties.get(path);
   }
 
   @Override
   public long getLength() {
-    return task.files().stream().mapToLong(FileScanTask::length).sum();
+    return contentFile.fileSizeInBytes();
   }
 
   @Override
   public String[] getLocations() {
-    // The implementation of getLocations() is only meant to be used during 
split computation
-    // getLocations() won't be accurate when called on worker nodes and will 
always return "*"
-    if (locations == null && conf != null) {
-      boolean localityPreferred = conf.getBoolean(InputFormatConfig.LOCALITY, 
false);
-      locations = localityPreferred ? Util.blockLocations(task, conf) : 
ANYWHERE;
-    } else {
-      locations = ANYWHERE;
-    }
-
-    return locations;
+    return new String[]{"*"};
   }
 
   @Override
   public void write(DataOutput out) throws IOException {
-    byte[] data = SerializationUtil.serializeToBytes(this.task);
+    byte[] data = SerializationUtil.serializeToBytes(this.contentFile);
     out.writeInt(data.length);
     out.write(data);
   }
@@ -89,6 +70,10 @@ public class IcebergSplit extends InputSplit implements 
org.apache.hadoop.mapred
   public void readFields(DataInput in) throws IOException {
     byte[] data = new byte[in.readInt()];
     in.readFully(data);
-    this.task = SerializationUtil.deserializeFromBytes(data);
+    this.contentFile = SerializationUtil.deserializeFromBytes(data);
+  }
+
+  public ContentFile getContentFile() {
+    return contentFile;
   }
 }
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
new file mode 100644
index 00000000000..0c5b719aaa9
--- /dev/null
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergRecordReader.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataTask;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.common.DynMethods;
+import org.apache.iceberg.data.DeleteFilter;
+import org.apache.iceberg.data.GenericDeleteFilter;
+import org.apache.iceberg.data.IdentityPartitionConverters;
+import org.apache.iceberg.data.avro.DataReader;
+import org.apache.iceberg.data.orc.GenericOrcReader;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.hive.HiveVersion;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.mr.hive.HiveIcebergInputFormat;
+import org.apache.iceberg.mr.hive.IcebergAcidUtil;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+
+public final class IcebergRecordReader<T> extends 
AbstractIcebergRecordReader<T> {
+
+  private static final String HIVE_VECTORIZED_READER_CLASS = 
"org.apache.iceberg.mr.hive.vector.HiveVectorizedReader";
+  private static final DynMethods.StaticMethod HIVE_VECTORIZED_READER_BUILDER;
+
+  static {
+    if (HiveVersion.min(HiveVersion.HIVE_3)) {
+      HIVE_VECTORIZED_READER_BUILDER = DynMethods.builder("reader")
+          .impl(HIVE_VECTORIZED_READER_CLASS,
+                  Table.class,
+                  Path.class,
+                  FileScanTask.class,
+                  Map.class,
+                  TaskAttemptContext.class,
+                  Expression.class,
+                  Schema.class)
+                .buildStatic();
+    } else {
+      HIVE_VECTORIZED_READER_BUILDER = null;
+    }
+  }
+
+  private Iterator<FileScanTask> tasks;
+  private CloseableIterator<T> currentIterator;
+  private T current;
+
+  @Override
+  public void initialize(InputSplit split, TaskAttemptContext newContext) {
+    // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
+    super.initialize(split, newContext);
+    CombinedScanTask task = ((IcebergSplit) split).task();
+    this.tasks = task.files().iterator();
+    this.currentIterator = nextTask();
+  }
+
+  private CloseableIterator<T> nextTask() {
+    CloseableIterator<T> closeableIterator = open(tasks.next(), 
getExpectedSchema()).iterator();
+    if (!isFetchVirtualColumns() || Utilities.getIsVectorized(getConf())) {
+      return closeableIterator;
+    }
+    return new 
IcebergAcidUtil.VirtualColumnAwareIterator<T>(closeableIterator, 
getExpectedSchema(), getConf());
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException {
+    while (true) {
+      if (currentIterator.hasNext()) {
+        current = currentIterator.next();
+        return true;
+      } else if (tasks.hasNext()) {
+        currentIterator.close();
+        this.currentIterator = nextTask();
+      } else {
+        currentIterator.close();
+        return false;
+      }
+    }
+  }
+
+  @Override
+  public T getCurrentValue() {
+    return current;
+  }
+
+  @Override
+  public void close() throws IOException {
+    currentIterator.close();
+  }
+
+  private CloseableIterable<T> openVectorized(FileScanTask task, Schema 
readSchema) {
+    Preconditions.checkArgument(!task.file().format().equals(FileFormat.AVRO),
+        "Vectorized execution is not yet supported for Iceberg avro tables. " +
+        "Please turn off vectorization and retry the query.");
+    Preconditions.checkArgument(HiveVersion.min(HiveVersion.HIVE_3),
+        "Vectorized read is unsupported for Hive 2 integration.");
+
+    Path path = new Path(task.file().path().toString());
+    Map<Integer, ?> idToConstant = constantsMap(task, 
HiveIdentityPartitionConverters::convertConstant);
+    Expression residual = HiveIcebergInputFormat.residualForTask(task, 
getContext().getConfiguration());
+
+    // TODO: We have to take care of the EncryptionManager when LLAP and 
vectorization is used
+    CloseableIterable<T> iterator = 
HIVE_VECTORIZED_READER_BUILDER.invoke(getTable(), path, task,
+        idToConstant, getContext(), residual, readSchema);
+
+    return applyResidualFiltering(iterator, residual, readSchema);
+  }
+
+  private CloseableIterable<T> openGeneric(FileScanTask task, Schema 
readSchema) {
+    if (task.isDataTask()) {
+      // When querying metadata tables, the currentTask is a DataTask and the 
data has to
+      // be fetched from the task instead of reading it from files.
+      IcebergInternalRecordWrapper wrapper =
+          new IcebergInternalRecordWrapper(getTable().schema().asStruct(), 
readSchema.asStruct());
+      return (CloseableIterable) CloseableIterable.transform(((DataTask) 
task).rows(), row -> wrapper.wrap(row));
+    }
+
+    DataFile file = task.file();
+    InputFile inputFile = 
getTable().encryption().decrypt(EncryptedFiles.encryptedInput(
+        getTable().io().newInputFile(file.path().toString()),
+        file.keyMetadata()));
+
+    CloseableIterable<T> iterable;
+    switch (file.format()) {
+      case AVRO:
+        iterable = newAvroIterable(inputFile, task, readSchema);
+        break;
+      case ORC:
+        iterable = newOrcIterable(inputFile, task, readSchema);
+        break;
+      case PARQUET:
+        iterable = newParquetIterable(inputFile, task, readSchema);
+        break;
+      default:
+        throw new UnsupportedOperationException(
+            String.format("Cannot read %s file: %s", file.format().name(), 
file.path()));
+    }
+
+    return iterable;
+  }
+
+  @SuppressWarnings("unchecked")
+  private CloseableIterable<T> open(FileScanTask currentTask, Schema 
readSchema) {
+    switch (getInMemoryDataModel()) {
+      case PIG:
+        // TODO: Support Pig and Hive object models for IcebergInputFormat
+        throw new UnsupportedOperationException("Pig and Hive object models 
are not supported.");
+      case HIVE:
+        return openVectorized(currentTask, readSchema);
+      case GENERIC:
+        DeleteFilter deletes = new GenericDeleteFilter(getTable().io(), 
currentTask, getTable().schema(), readSchema);
+        Schema requiredSchema = deletes.requiredSchema();
+        return deletes.filter(openGeneric(currentTask, requiredSchema));
+      default:
+        throw new UnsupportedOperationException("Unsupported memory model");
+    }
+  }
+
+  private CloseableIterable<T> newAvroIterable(
+          InputFile inputFile, FileScanTask task, Schema readSchema) {
+    Expression residual = HiveIcebergInputFormat.residualForTask(task, 
getContext().getConfiguration());
+    Avro.ReadBuilder avroReadBuilder = Avro.read(inputFile)
+        .project(readSchema)
+        .split(task.start(), task.length());
+
+    if (isReuseContainers()) {
+      avroReadBuilder.reuseContainers();
+    }
+
+    if (getNameMapping() != null) {
+      
avroReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+    }
+
+    avroReadBuilder.createReaderFunc(
+        (expIcebergSchema, expAvroSchema) ->
+             DataReader.create(expIcebergSchema, expAvroSchema,
+                 constantsMap(task, 
IdentityPartitionConverters::convertConstant)));
+
+    return applyResidualFiltering(avroReadBuilder.build(), residual, 
readSchema);
+  }
+
+  private CloseableIterable<T> newParquetIterable(InputFile inputFile, 
FileScanTask task, Schema readSchema) {
+    Expression residual = HiveIcebergInputFormat.residualForTask(task, 
getContext().getConfiguration());
+
+    Parquet.ReadBuilder parquetReadBuilder = Parquet.read(inputFile)
+        .project(readSchema)
+        .filter(residual)
+        .caseSensitive(isCaseSensitive())
+        .split(task.start(), task.length());
+
+    if (isReuseContainers()) {
+      parquetReadBuilder.reuseContainers();
+    }
+
+    if (getNameMapping() != null) {
+      
parquetReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+    }
+
+    parquetReadBuilder.createReaderFunc(
+        fileSchema -> GenericParquetReaders.buildReader(
+            readSchema, fileSchema, constantsMap(task, 
IdentityPartitionConverters::convertConstant)));
+
+    return applyResidualFiltering(parquetReadBuilder.build(), residual, 
readSchema);
+  }
+
+  private CloseableIterable<T> newOrcIterable(InputFile inputFile, 
FileScanTask task, Schema readSchema) {
+    Map<Integer, ?> idToConstant = constantsMap(task, 
IdentityPartitionConverters::convertConstant);
+    Schema readSchemaWithoutConstantAndMetadataFields = 
schemaWithoutConstantsAndMeta(readSchema, idToConstant);
+    Expression residual = HiveIcebergInputFormat.residualForTask(task, 
getContext().getConfiguration());
+
+    ORC.ReadBuilder orcReadBuilder = ORC.read(inputFile)
+        .project(readSchemaWithoutConstantAndMetadataFields)
+        .filter(residual)
+        .caseSensitive(isCaseSensitive())
+        .split(task.start(), task.length());
+
+    if (getNameMapping() != null) {
+      
orcReadBuilder.withNameMapping(NameMappingParser.fromJson(getNameMapping()));
+    }
+
+    orcReadBuilder.createReaderFunc(
+        fileSchema -> GenericOrcReader.buildReader(
+            readSchema, fileSchema, idToConstant));
+
+    return applyResidualFiltering(orcReadBuilder.build(), residual, 
readSchema);
+  }
+
+  private Map<Integer, ?> constantsMap(FileScanTask task, BiFunction<Type, 
Object, Object> converter) {
+    PartitionSpec spec = task.spec();
+    Set<Integer> idColumns = spec.identitySourceIds();
+    Schema partitionSchema = TypeUtil.select(getExpectedSchema(), idColumns);
+    boolean projectsIdentityPartitionColumns = 
!partitionSchema.columns().isEmpty();
+    if (getExpectedSchema().findField(MetadataColumns.PARTITION_COLUMN_ID) != 
null) {
+      Types.StructType partitionType = Partitioning.partitionType(getTable());
+      return PartitionUtil.constantsMap(task, partitionType, converter);
+    } else if (projectsIdentityPartitionColumns) {
+      Types.StructType partitionType = Partitioning.partitionType(getTable());
+      return PartitionUtil.constantsMap(task, partitionType, converter);
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  private static Schema schemaWithoutConstantsAndMeta(Schema readSchema, 
Map<Integer, ?> idToConstant) {
+    // remove the nested fields of the partition struct
+    Set<Integer> partitionFields = 
Optional.ofNullable(readSchema.findField(MetadataColumns.PARTITION_COLUMN_ID))
+        .map(Types.NestedField::type)
+        .map(Type::asStructType)
+        .map(Types.StructType::fields)
+        .map(fields -> 
fields.stream().map(Types.NestedField::fieldId).collect(Collectors.toSet()))
+        .orElseGet(Collections::emptySet);
+
+        // remove constants and meta columns too
+    Set<Integer> collect = Stream.of(idToConstant.keySet(), 
MetadataColumns.metadataFieldIds(), partitionFields)
+        .flatMap(Set::stream)
+        .collect(Collectors.toSet());
+
+    return TypeUtil.selectNot(readSchema, collect);
+  }
+}
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
index 420bf9ea3b1..395ea35fe47 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -32,7 +32,7 @@ import org.apache.iceberg.util.SerializationUtil;
 
 // Since this class extends `mapreduce.InputSplit and implements 
`mapred.InputSplit`, it can be returned by both MR v1
 // and v2 file formats.
-public class IcebergSplit extends InputSplit implements 
org.apache.hadoop.mapred.InputSplit, IcebergSplitContainer {
+public class IcebergSplit extends InputSplit implements IcebergSplitContainer {
 
   public static final String[] ANYWHERE = new String[]{"*"};
 
diff --git 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
index c77543763b1..b989444957c 100644
--- 
a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
+++ 
b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
@@ -19,7 +19,9 @@
 
 package org.apache.iceberg.mr.mapreduce;
 
-public interface IcebergSplitContainer {
+import org.apache.hadoop.mapred.InputSplit;
+
+public interface IcebergSplitContainer extends InputSplit {
 
   IcebergSplit icebergSplit();
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
index d057bf4f460..e5f407ee15c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -368,18 +370,14 @@ public class CombineHiveInputFormat<K extends 
WritableComparable, V extends Writ
       PartitionDesc part = HiveFileFormatUtils.getFromPathRecursively(
           pathToPartitionInfo, path, 
IOPrepareCache.get().allocatePartitionDescMap());
       TableDesc tableDesc = part.getTableDesc();
-      if (tableDesc != null) {
-        boolean useDefaultFileFormat = part.getInputFileFormatClass()
-                .isAssignableFrom(tableDesc.getInputFileFormatClass());
-        if (tableDesc.isNonNative() && useDefaultFileFormat) {
-          return super.getSplits(job, numSplits);
-        }
-      }
-
       // Use HiveInputFormat if any of the paths is not splittable
       Class inputFormatClass = part.getInputFileFormatClass();
       String inputFormatClassName = inputFormatClass.getName();
       InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
+      if (tableDesc != null && tableDesc.isNonNative() && 
mrwork.getMergeSplitProperties() == null) {
+        return super.getSplits(job, numSplits);
+      }
+
       String deserializerClassName = null;
       try {
         deserializerClassName = part.getDeserializer(job).getClass().getName();
@@ -765,4 +763,8 @@ public class CombineHiveInputFormat<K extends 
WritableComparable, V extends Writ
   public interface AvoidSplitCombination {
     boolean shouldSkipCombine(Path path, Configuration conf) throws 
IOException;
   }
+
+  public interface MergeSplits extends AvoidSplitCombination {
+    FileSplit createMergeSplit(Configuration conf, CombineHiveInputSplit 
split, Integer partition, Properties splitProperties) throws IOException;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
index 7a687864cea..c0118d5e3d8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java
@@ -70,14 +70,12 @@ public class CombineHiveRecordReader<K extends 
WritableComparable, V extends Wri
           + inputFormatClassName);
     }
     InputFormat inputFormat = 
HiveInputFormat.getInputFormatFromCache(inputFormatClass, jobConf);
+    MapWork mrwork = null;
     if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_IO_ENABLED, 
LlapProxy.isDaemon())) {
       try {
         // TODO : refactor this out
         if (pathToPartInfo == null) {
-          MapWork mrwork = (MapWork) Utilities.getMergeWork(jobConf);
-          if (mrwork == null) {
-            mrwork = Utilities.getMapWork(jobConf);
-          }
+          mrwork = getMrWork(jobConf);
           pathToPartInfo = mrwork.getPathToPartitionInfo();
         }
 
@@ -88,14 +86,21 @@ public class CombineHiveRecordReader<K extends 
WritableComparable, V extends Wri
       }
     }
 
+    FileSplit inputSplit;
     // create a split for the given partition
-    FileSplit fsplit = new FileSplit(hsplit.getPaths()[partition], hsplit
-        .getStartOffsets()[partition], hsplit.getLengths()[partition], hsplit
-        .getLocations());
+    if (inputFormat instanceof CombineHiveInputFormat.MergeSplits) {
+      mrwork = getMrWork(jobConf);
+      inputSplit = ((CombineHiveInputFormat.MergeSplits) 
inputFormat).createMergeSplit(jobConf, hsplit, partition,
+              mrwork.getMergeSplitProperties());
+    } else {
+      inputSplit = new FileSplit(hsplit.getPaths()[partition], hsplit
+              .getStartOffsets()[partition], hsplit.getLengths()[partition], 
hsplit
+              .getLocations());
+    }
 
-    this.setRecordReader(inputFormat.getRecordReader(fsplit, jobConf, 
reporter));
+    this.setRecordReader(inputFormat.getRecordReader(inputSplit, jobConf, 
reporter));
 
-    this.initIOContext(fsplit, jobConf, inputFormatClass, this.recordReader);
+    this.initIOContext(inputSplit, jobConf, inputFormatClass, 
this.recordReader);
 
     //If current split is from the same file as preceding split and the 
preceding split has footerbuffer,
     //the current split should use the preceding split's footerbuffer in order 
to skip footer correctly.
@@ -127,6 +132,14 @@ public class CombineHiveRecordReader<K extends 
WritableComparable, V extends Wri
     return part;
   }
 
+  private MapWork getMrWork(JobConf jobConf) {
+    MapWork mrwork = (MapWork) Utilities.getMergeWork(jobConf);
+    if (mrwork == null) {
+      mrwork = Utilities.getMapWork(jobConf);
+    }
+    return mrwork;
+  }
+
   @Override
   public void doClose() throws IOException {
     recordReader.close();
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
index 05e54798840..4e6d4c3bcfe 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java
@@ -349,9 +349,6 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
           Utilities.FILE_OP_LOGGER.warn("merger ignoring invalid DP path " + 
status[i].getPath());
           continue;
         }
-        if (useCustomStorageHandler) {
-          updatePartDescProperties(pDesc, mergeProperties);
-        }
         Utilities.FILE_OP_LOGGER.debug("merge resolver will merge " + 
status[i].getPath());
         work.resolveDynamicPartitionStoredAsSubDirsMerge(conf, 
status[i].getPath(), tblDesc,
             aliases, pDesc);
@@ -568,11 +565,11 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
       mapWork.setAliasToWork(aliasToWork);
     }
     if (partitionDesc != null) {
-      updatePartDescProperties(partitionDesc, mergeProperties);
       pathToPartitionInfo.remove(dirPath);
       pathToPartitionInfo.put(tmpDir, partitionDesc);
       mapWork.setPathToPartitionInfo(pathToPartitionInfo);
     }
+    mapWork.setMergeSplitProperties(mergeProperties.getSplitProperties());
     mapWork.removePathToAlias(dirPath);
     mapWork.addPathToAlias(tmpDir, tmpDir.toString());
     mapWork.setUseInputPathsDirectly(true);
@@ -593,22 +590,4 @@ public class ConditionalResolverMergeFiles implements 
ConditionalResolver,
     }
     return manifestDirsToPaths;
   }
-
-  private void updatePartDescProperties(PartitionDesc partitionDesc,
-                                        MergeTaskProperties mergeProperties) 
throws IOException, ClassNotFoundException {
-    if (mergeProperties != null) {
-      String inputFileFormatClassName = 
mergeProperties.getStorageFormatDescriptor().getInputFormat();
-      String outputFileFormatClassName = 
mergeProperties.getStorageFormatDescriptor().getOutputFormat();
-      String serdeClassName = 
mergeProperties.getStorageFormatDescriptor().getSerde();
-      if (inputFileFormatClassName != null) {
-        
partitionDesc.setInputFileFormatClass(JavaUtils.loadClass(inputFileFormatClassName));
-      }
-      if (outputFileFormatClassName != null) {
-        
partitionDesc.setOutputFileFormatClass(JavaUtils.loadClass(outputFileFormatClassName));
-      }
-      if (serdeClassName != null) {
-        
partitionDesc.getTableDesc().getProperties().setProperty(serdeConstants.SERIALIZATION_LIB,
 serdeClassName);
-      }
-    }
-  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
index 076ef0a99b7..e705af31dea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
@@ -22,18 +22,8 @@ import org.apache.hadoop.hive.common.StringInternUtils;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator.ProbeDecodeContext;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.BitSet;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -184,6 +174,8 @@ public class MapWork extends BaseWork {
 
   private boolean useInputPathsDirectly;
 
+  private Properties mergeSplitProperties;
+
   public MapWork() {}
 
   public MapWork(String name) {
@@ -954,4 +946,12 @@ public class MapWork extends BaseWork {
   public boolean isUseInputPathsDirectly() {
     return useInputPathsDirectly;
   }
+
+  public Properties getMergeSplitProperties() {
+    return mergeSplitProperties;
+  }
+
+  public void setMergeSplitProperties(Properties mergeSplitProperties) {
+    this.mergeSplitProperties = mergeSplitProperties;
+  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
index 6083b1b117f..fdf8451d273 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MergeTaskProperties.java
@@ -19,12 +19,16 @@
 package org.apache.hadoop.hive.ql.plan;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Properties;
 
 public interface MergeTaskProperties {
   public Path getTmpLocation();
 
-  public StorageFormatDescriptor getStorageFormatDescriptor() throws 
IOException;
+  default Properties getSplitProperties() throws IOException {
+    return null;
+  }
 }


Reply via email to