This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 129c969  Hive: Add mapred InputFormat (#1192)
129c969 is described below

commit 129c9693683fe0f3937f467d1fd817cb602f8224
Author: Adrien Guillo <[email protected]>
AuthorDate: Wed Jul 22 10:06:21 2020 -0700

    Hive: Add mapred InputFormat (#1192)
---
 build.gradle                                       |  24 +
 .../org/apache/iceberg/mr/InputFormatConfig.java   |  14 +
 .../iceberg/mr/hive/HiveIcebergInputFormat.java    |  85 ++++
 .../apache/iceberg/mr/hive/HiveIcebergSplit.java   |  93 ++++
 .../{IcebergWritable.java => Container.java}       |  34 +-
 .../org/apache/iceberg/mr/mapred/IcebergSerDe.java |   4 +-
 .../mr/mapred/MapredIcebergInputFormat.java        | 191 +++++++
 .../apache/iceberg/mr/mapred/TableResolver.java    |   4 +-
 .../iceberg/mr/mapreduce/IcebergInputFormat.java   |  58 +--
 .../apache/iceberg/mr/mapreduce/IcebergSplit.java  |  89 ++++
 .../mr/mapreduce/IcebergSplitContainer.java        |  26 +
 .../java/org/apache/iceberg/mr/TestHelper.java     | 226 +++++++++
 .../apache/iceberg/mr/TestIcebergInputFormats.java | 485 ++++++++++++++++++
 .../mr/hive/TestHiveIcebergInputFormat.java        | 169 +++++++
 .../org/apache/iceberg/mr/mapred/TestHelpers.java  | 119 -----
 .../apache/iceberg/mr/mapred/TestIcebergSerDe.java |   5 +-
 .../mr/mapreduce/TestIcebergInputFormat.java       | 557 ---------------------
 17 files changed, 1420 insertions(+), 763 deletions(-)

diff --git a/build.gradle b/build.gradle
index babd164..6510cc5 100644
--- a/build.gradle
+++ b/build.gradle
@@ -79,6 +79,7 @@ subprojects {
     all {
       exclude group: 'org.slf4j', module: 'slf4j-log4j12'
       exclude group: 'org.mortbay.jetty'
+      exclude group: 'org.pentaho', module: 'pentaho-aggdesigner-algorithm'
 
       resolutionStrategy {
         force 'com.fasterxml.jackson.module:jackson-module-scala_2.11:2.10.2'
@@ -371,11 +372,34 @@ project(':iceberg-mr') {
     compileOnly("org.apache.hadoop:hadoop-client") {
       exclude group: 'org.apache.avro', module: 'avro'
     }
+
+    compileOnly("org.apache.hive:hive-exec::core") {
+      exclude group: 'com.google.code.findbugs', module: 'jsr305'
+      exclude group: 'com.google.guava'
+      exclude group: 'com.google.protobuf', module: 'protobuf-java'
+      exclude group: 'org.apache.avro', module: 'avro'
+      exclude group: 'org.apache.calcite.avatica'
+      exclude group: 'org.apache.hive', module: 'hive-llap-tez'
+      exclude group: 'org.apache.logging.log4j'
+      exclude group: 'org.pentaho' // missing dependency
+      exclude group: 'org.slf4j', module: 'slf4j-log4j12'
+    }
     compileOnly("org.apache.hive:hive-serde")
 
     testCompile project(path: ':iceberg-data', configuration: 'testArtifacts')
     testCompile project(path: ':iceberg-api', configuration: 'testArtifacts')
     testCompile project(path: ':iceberg-core', configuration: 'testArtifacts')
+
+    testCompile("org.apache.avro:avro:1.9.2")
+    testCompile("org.apache.calcite:calcite-core")
+    testCompile("com.esotericsoftware:kryo-shaded:4.0.2")
+    testCompile("com.fasterxml.jackson.core:jackson-annotations:2.6.5")
+    testCompile("com.klarna:hiverunner:5.2.1") {
+      exclude group: 'javax.jms', module: 'jms'
+      exclude group: 'org.apache.hive', module: 'hive-exec'
+      exclude group: 'org.codehaus.jettison', module: 'jettison'
+      exclude group: 'org.apache.calcite.avatica'
+    }
   }
 }
 
diff --git a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java 
b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
index f4b9aae..e266155 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
@@ -19,11 +19,13 @@
 
 package org.apache.iceberg.mr;
 
+import java.io.File;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SchemaParser;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
 
 public class InputFormatConfig {
@@ -73,6 +75,10 @@ public class InputFormatConfig {
       conf.setBoolean(LOCALITY, false);
     }
 
+    public Configuration conf() {
+      return conf;
+    }
+
     public ConfigBuilder filter(Expression expression) {
       conf.set(FILTER_EXPRESSION, 
SerializationUtil.serializeToBase64(expression));
       return this;
@@ -88,6 +94,14 @@ public class InputFormatConfig {
       return this;
     }
 
+    public ConfigBuilder readFrom(TableIdentifier identifier) {
+      return readFrom(identifier.toString());
+    }
+
+    public ConfigBuilder readFrom(File path) {
+      return readFrom(path.toString());
+    }
+
     public ConfigBuilder readFrom(String path) {
       conf.set(TABLE_PATH, path);
       return this;
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
new file mode 100644
index 0000000..0b9fd70
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapred.TableResolver;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+
+public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
+                                    implements 
CombineHiveInputFormat.AvoidSplitCombination {
+
+  private transient Table table;
+  private transient Schema schema;
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
+    table = TableResolver.resolveTableFromConfiguration(job);
+    schema = table.schema();
+
+    forwardConfigSettings(job);
+
+    return Arrays.stream(super.getSplits(job, numSplits))
+                 .map(split -> new HiveIcebergSplit((IcebergSplit) split, 
table.location()))
+                 .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<Record>> getRecordReader(InputSplit 
split, JobConf job,
+                                                               Reporter 
reporter) throws IOException {
+    // Since Hive passes a copy of `job` in `getSplits`, we need to forward 
the conf settings again.
+    forwardConfigSettings(job);
+    return super.getRecordReader(split, job, reporter);
+  }
+
+  @Override
+  public boolean shouldSkipCombine(Path path, Configuration conf) {
+    return true;
+  }
+
+  /**
+   * Forward configuration settings to the underlying MR input format.
+   */
+  private void forwardConfigSettings(JobConf job) {
+    Preconditions.checkNotNull(table, "Table cannot be null");
+    Preconditions.checkNotNull(schema, "Schema cannot be null");
+
+    // Once mapred.TableResolver and mapreduce.TableResolver use the same 
property for the location of the table
+    // (TABLE_LOCATION vs. TABLE_PATH), this line can be removed: see 
https://github.com/apache/iceberg/issues/1155.
+    job.set(InputFormatConfig.TABLE_PATH, table.location());
+    job.set(InputFormatConfig.TABLE_SCHEMA, SchemaParser.toJson(schema));
+  }
+}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
new file mode 100644
index 0000000..e94ff20
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergSplit.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.iceberg.mr.SerializationUtil;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+// Hive requires file formats to return splits that are instances of 
`FileSplit`.
+public class HiveIcebergSplit extends FileSplit implements 
IcebergSplitContainer {
+
+  private IcebergSplit innerSplit;
+
+  // Hive uses the path name of a split to map it back to a partition 
(`PartitionDesc`) or table description object
+  // (`TableDesc`) which specifies the relevant input format for reading the 
files belonging to that partition or table.
+  // That way, `HiveInputFormat` and `CombineHiveInputFormat` can read files 
with different file formats in the same
+  // MapReduce job and merge compatible splits together.
+  private String tableLocation;
+
+  // public no-argument constructor for deserialization
+  public HiveIcebergSplit() {}
+
+  HiveIcebergSplit(IcebergSplit split, String tableLocation) {
+    this.innerSplit = split;
+    this.tableLocation = tableLocation;
+  }
+
+  @Override
+  public IcebergSplit icebergSplit() {
+    return innerSplit;
+  }
+
+  @Override
+  public long getLength() {
+    return innerSplit.getLength();
+  }
+
+  @Override
+  public String[] getLocations() {
+    return innerSplit.getLocations();
+  }
+
+  @Override
+  public Path getPath() {
+    return new Path(tableLocation);
+  }
+
+  @Override
+  public long getStart() {
+    return 0;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    byte[] bytes = SerializationUtil.serializeToBytes(tableLocation);
+    out.writeInt(bytes.length);
+    out.write(bytes);
+
+    innerSplit.write(out);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte[] bytes = new byte[in.readInt()];
+    in.readFully(bytes);
+    tableLocation = SerializationUtil.deserializeFromBytes(bytes);
+
+    innerSplit = new IcebergSplit();
+    innerSplit.readFields(in);
+  }
+}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java
similarity index 64%
rename from mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java
rename to mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java
index 1eb67f9..fda174e 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergWritable.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/Container.java
@@ -22,38 +22,22 @@ package org.apache.iceberg.mr.mapred;
 import java.io.DataInput;
 import java.io.DataOutput;
 import org.apache.hadoop.io.Writable;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.data.Record;
 
 /**
- * Wraps an Iceberg Record in a Writable which Hive can use in the SerDe.
+ * A simple container of objects that you can get and set.
+ *
+ * @param <T> the Java type of the object held by this container
  */
-public class IcebergWritable implements Writable {
-
-  private Record record;
-  private Schema schema;
-
-  public IcebergWritable(Record record, Schema schema) {
-    this.record = record;
-    this.schema = schema;
-  }
+public class Container<T> implements Writable {
 
-  @SuppressWarnings("checkstyle:HiddenField")
-  public void wrapRecord(Record record) {
-    this.record = record;
-  }
-
-  public Record record() {
-    return record;
-  }
+  private T value;
 
-  public Schema schema() {
-    return schema;
+  public T get() {
+    return value;
   }
 
-  @SuppressWarnings("checkstyle:HiddenField")
-  public void wrapSchema(Schema schema) {
-    this.schema = schema;
+  public void set(T newValue) {
+    this.value = newValue;
   }
 
   @Override
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
index 871e0e6..f0ee139 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/IcebergSerDe.java
@@ -55,7 +55,7 @@ public class IcebergSerDe extends AbstractSerDe {
 
   @Override
   public Class<? extends Writable> getSerializedClass() {
-    return IcebergWritable.class;
+    return Container.class;
   }
 
   @Override
@@ -70,7 +70,7 @@ public class IcebergSerDe extends AbstractSerDe {
 
   @Override
   public Object deserialize(Writable writable) {
-    return ((IcebergWritable) writable).record();
+    return ((Container<?>) writable).get();
   }
 
   @Override
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
new file mode 100644
index 0000000..5ecba15
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/iceberg/mr/mapred/MapredIcebergInputFormat.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapred;
+
+import java.io.IOException;
+import java.util.Optional;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.StatusReporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.mr.mapreduce.IcebergSplitContainer;
+
+/**
+ * Generic MR v1 InputFormat API for Iceberg.
+ *
+ * @param <T> Java class of records constructed by Iceberg; default is {@link 
Record}
+ */
+public class MapredIcebergInputFormat<T> implements InputFormat<Void, 
Container<T>> {
+
+  private final org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> 
innerInputFormat;
+
+  public MapredIcebergInputFormat() {
+    this.innerInputFormat = new 
org.apache.iceberg.mr.mapreduce.IcebergInputFormat<>();
+  }
+
+  /**
+   * Configures the {@code JobConf} to use the {@code 
MapredIcebergInputFormat} and
+   * returns a helper to add further configuration.
+   *
+   * @param job the {@code JobConf} to configure
+   */
+  public static InputFormatConfig.ConfigBuilder configure(JobConf job) {
+    job.setInputFormat(MapredIcebergInputFormat.class);
+    return new InputFormatConfig.ConfigBuilder(job);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
+    return innerInputFormat.getSplits(newJobContext(job))
+                           .stream()
+                           .map(InputSplit.class::cast)
+                           .toArray(InputSplit[]::new);
+  }
+
+  @Override
+  public RecordReader<Void, Container<T>> getRecordReader(InputSplit split, 
JobConf job,
+                                                          Reporter reporter) 
throws IOException {
+    IcebergSplit icebergSplit = ((IcebergSplitContainer) split).icebergSplit();
+    return new MapredIcebergRecordReader<>(innerInputFormat, icebergSplit, 
job, reporter);
+  }
+
+  private static final class MapredIcebergRecordReader<T> implements 
RecordReader<Void, Container<T>> {
+
+    private final org.apache.hadoop.mapreduce.RecordReader<Void, T> 
innerReader;
+    private final long splitLength; // for getPos()
+
+    
MapredIcebergRecordReader(org.apache.iceberg.mr.mapreduce.IcebergInputFormat<T> 
mapreduceInputFormat,
+                              IcebergSplit split, JobConf job, Reporter 
reporter) throws IOException {
+      TaskAttemptContext context = newTaskAttemptContext(job, reporter);
+
+      try {
+        innerReader = mapreduceInputFormat.createRecordReader(split, context);
+        innerReader.initialize(split, context);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+
+      splitLength = split.getLength();
+    }
+
+    @Override
+    public boolean next(Void key, Container<T> value) throws IOException {
+      try {
+        if (innerReader.nextKeyValue()) {
+          value.set(innerReader.getCurrentValue());
+          return true;
+        }
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(ie);
+      }
+
+      return false;
+    }
+
+    @Override
+    public Void createKey() {
+      return null;
+    }
+
+    @Override
+    public Container<T> createValue() {
+      return new Container<>();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return (long) (splitLength * getProgress());
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      try {
+        return innerReader.getProgress();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (innerReader != null) {
+        innerReader.close();
+      }
+    }
+  }
+
+  private static JobContext newJobContext(JobConf job) {
+    JobID jobID = Optional.ofNullable(JobID.forName(job.get(JobContext.ID)))
+                          .orElse(new JobID());
+
+    return new JobContextImpl(job, jobID);
+  }
+
+  private static TaskAttemptContext newTaskAttemptContext(JobConf job, 
Reporter reporter) {
+    TaskAttemptID taskAttemptID = 
Optional.ofNullable(TaskAttemptID.forName(job.get(JobContext.TASK_ATTEMPT_ID)))
+                                          .orElse(new TaskAttemptID());
+
+    return new TaskAttemptContextImpl(job, taskAttemptID, 
toStatusReporter(reporter));
+  }
+
+  private static StatusReporter toStatusReporter(Reporter reporter) {
+    return new StatusReporter() {
+      @Override
+      public Counter getCounter(Enum<?> name) {
+        return reporter.getCounter(name);
+      }
+
+      @Override
+      public Counter getCounter(String group, String name) {
+        return reporter.getCounter(group, name);
+      }
+
+      @Override
+      public void progress() {
+        reporter.progress();
+      }
+
+      @Override
+      public float getProgress() {
+        return reporter.getProgress();
+      }
+
+      @Override
+      public void setStatus(String status) {
+        reporter.setStatus(status);
+      }
+    };
+  }
+}
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
index 7ee670d..c8c7414 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapred/TableResolver.java
@@ -29,7 +29,7 @@ import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.mr.InputFormatConfig;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 
-final class TableResolver {
+public final class TableResolver {
 
   private TableResolver() {
   }
@@ -42,7 +42,7 @@ final class TableResolver {
     return resolveTableFromConfiguration(configuration);
   }
 
-  static Table resolveTableFromConfiguration(Configuration conf) throws 
IOException {
+  public static Table resolveTableFromConfiguration(Configuration conf) throws 
IOException {
     //Default to HadoopTables
     String catalogName = conf.get(InputFormatConfig.CATALOG_NAME, 
InputFormatConfig.HADOOP_TABLES);
 
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
index 040194c..7c0a737 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergInputFormat.java
@@ -19,8 +19,6 @@
 
 package org.apache.iceberg.mr.mapreduce;
 
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.io.IOException;
 import java.io.UncheckedIOException;
 import java.util.Collections;
@@ -31,7 +29,6 @@ import java.util.Set;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
@@ -62,7 +59,6 @@ import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.hadoop.HadoopInputFile;
 import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.hadoop.Util;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.CloseableIterator;
 import org.apache.iceberg.io.InputFile;
@@ -87,8 +83,6 @@ import org.slf4j.LoggerFactory;
 public class IcebergInputFormat<T> extends InputFormat<Void, T> {
   private static final Logger LOG = 
LoggerFactory.getLogger(IcebergInputFormat.class);
 
-  private transient List<InputSplit> splits;
-
   /**
    * Configures the {@code Job} to use the {@code IcebergInputFormat} and
    * returns a helper to add further configuration.
@@ -102,11 +96,6 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
 
   @Override
   public List<InputSplit> getSplits(JobContext context) {
-    if (splits != null) {
-      LOG.info("Returning cached splits: {}", splits.size());
-      return splits;
-    }
-
     Configuration conf = context.getConfiguration();
     Table table = findTable(conf);
     TableScan scan = table.newScan()
@@ -134,7 +123,7 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
       scan = scan.filter(filter);
     }
 
-    splits = Lists.newArrayList();
+    List<InputSplit> splits = Lists.newArrayList();
     boolean applyResidual = 
!conf.getBoolean(InputFormatConfig.SKIP_RESIDUAL_FILTERING, false);
     InputFormatConfig.InMemoryDataModel model = 
conf.getEnum(InputFormatConfig.IN_MEMORY_DATA_MODEL,
         InputFormatConfig.InMemoryDataModel.GENERIC);
@@ -186,7 +175,7 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
     public void initialize(InputSplit split, TaskAttemptContext newContext) {
       Configuration conf = newContext.getConfiguration();
       // For now IcebergInputFormat does its own split planning and does not 
accept FileSplit instances
-      CombinedScanTask task = ((IcebergSplit) split).task;
+      CombinedScanTask task = ((IcebergSplit) split).task();
       this.context = newContext;
       this.tasks = task.files().iterator();
       this.tableSchema = 
SchemaParser.fromJson(conf.get(InputFormatConfig.TABLE_SCHEMA));
@@ -382,47 +371,4 @@ public class IcebergInputFormat<T> extends 
InputFormat<Void, T> {
     }
   }
 
-  static class IcebergSplit extends InputSplit implements Writable {
-    static final String[] ANYWHERE = new String[]{"*"};
-    private CombinedScanTask task;
-    private transient String[] locations;
-    private transient Configuration conf;
-
-    IcebergSplit(Configuration conf, CombinedScanTask task) {
-      this.task = task;
-      this.conf = conf;
-    }
-
-    @Override
-    public long getLength() {
-      return task.files().stream().mapToLong(FileScanTask::length).sum();
-    }
-
-    @Override
-    public String[] getLocations() {
-      boolean localityPreferred = conf.getBoolean(InputFormatConfig.LOCALITY, 
false);
-      if (!localityPreferred) {
-        return ANYWHERE;
-      }
-      if (locations != null) {
-        return locations;
-      }
-      locations = Util.blockLocations(task, conf);
-      return locations;
-    }
-
-    @Override
-    public void write(DataOutput out) throws IOException {
-      byte[] data = SerializationUtil.serializeToBytes(this.task);
-      out.writeInt(data.length);
-      out.write(data);
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      byte[] data = new byte[in.readInt()];
-      in.readFully(data);
-      this.task = SerializationUtil.deserializeFromBytes(data);
-    }
-  }
 }
diff --git a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
new file mode 100644
index 0000000..1e9a6b5
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplit.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.SerializationUtil;
+
+// Since this class extends `mapreduce.InputSplit and implements 
`mapred.InputSplit`, it can be returned by both MR v1
+// and v2 file formats.
+public class IcebergSplit extends InputSplit implements 
org.apache.hadoop.mapred.InputSplit, IcebergSplitContainer {
+
+  public static final String[] ANYWHERE = new String[]{"*"};
+
+  private CombinedScanTask task;
+
+  private transient String[] locations;
+  private transient Configuration conf;
+
+  // public no-argument constructor for deserialization
+  public IcebergSplit() {}
+
+  IcebergSplit(Configuration conf, CombinedScanTask task) {
+    this.task = task;
+    this.conf = conf;
+  }
+
+  public CombinedScanTask task() {
+    return task;
+  }
+
+  @Override
+  public IcebergSplit icebergSplit() {
+    return this;
+  }
+
+  @Override
+  public long getLength() {
+    return task.files().stream().mapToLong(FileScanTask::length).sum();
+  }
+
+  @Override
+  public String[] getLocations() {
+    if (locations == null) {
+      boolean localityPreferred = conf.getBoolean(InputFormatConfig.LOCALITY, 
false);
+      locations = localityPreferred ? Util.blockLocations(task, conf) : 
ANYWHERE;
+    }
+
+    return locations;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    byte[] data = SerializationUtil.serializeToBytes(this.task);
+    out.writeInt(data.length);
+    out.write(data);
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    byte[] data = new byte[in.readInt()];
+    in.readFully(data);
+    this.task = SerializationUtil.deserializeFromBytes(data);
+  }
+}
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java 
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
new file mode 100644
index 0000000..c775437
--- /dev/null
+++ 
b/mr/src/main/java/org/apache/iceberg/mr/mapreduce/IcebergSplitContainer.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.mapreduce;
+
+public interface IcebergSplitContainer {
+
+  IcebergSplit icebergSplit();
+
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java 
b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
new file mode 100644
index 0000000..0c96891
--- /dev/null
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestHelper.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.RandomGenericData;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.rules.TemporaryFolder;
+
+public class TestHelper {
+
+  private final Configuration conf;
+  private final HadoopTables tables;
+  private final Schema schema;
+  private final PartitionSpec spec;
+  private final FileFormat fileFormat;
+  private final TemporaryFolder tmp;
+  private final File location;
+
+  private Table table;
+
+  public TestHelper(Configuration conf, HadoopTables tables, Schema schema, 
PartitionSpec spec, FileFormat fileFormat,
+                    TemporaryFolder tmp, File location) {
+    this.conf = conf;
+    this.tables = tables;
+    this.schema = schema;
+    this.spec = spec;
+    this.fileFormat = fileFormat;
+    this.tmp = tmp;
+    this.location = location;
+  }
+
+  private void setTable(Table table) {
+    this.table = table;
+    conf.set(InputFormatConfig.TABLE_SCHEMA, 
SchemaParser.toJson(table.schema()));
+  }
+
+  public Table getTable() {
+    return table;
+  }
+
+  public Table createTable(Schema theSchema, PartitionSpec theSpec) {
+    Map<String, String> properties = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
+    Table tbl = tables.create(theSchema, theSpec, properties, 
location.toString());
+    setTable(tbl);
+    return tbl;
+  }
+
+  public Table createTable(Catalog catalog, TableIdentifier identifier) {
+    Map<String, String> properties = 
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, fileFormat.name());
+    Table tbl = catalog.createTable(identifier, schema, spec, properties);
+    setTable(tbl);
+    return tbl;
+  }
+
+  public Table createPartitionedTable() {
+    return createTable(schema, spec);
+  }
+
+  public Table createUnpartitionedTable() {
+    return createTable(schema, PartitionSpec.unpartitioned());
+  }
+
+
+  public List<Record> generateRandomRecords(int num, long seed) {
+    Preconditions.checkNotNull(table, "table not set");
+    return generateRandomRecords(table.schema(), num, seed);
+  }
+
+  public static List<Record> generateRandomRecords(Schema schema, int num, 
long seed) {
+    return RandomGenericData.generate(schema, num, seed);
+  }
+
+  public void appendToTable(DataFile... dataFiles) {
+    Preconditions.checkNotNull(table, "table not set");
+
+    AppendFiles append = table.newAppend();
+
+    for (DataFile dataFile : dataFiles) {
+      append = append.appendFile(dataFile);
+    }
+
+    append.commit();
+  }
+
+  public void appendToTable(StructLike partition, List<Record> records) throws 
IOException {
+    appendToTable(writeFile(partition, records));
+  }
+
+  public DataFile writeFile(StructLike partition, List<Record> records) throws 
IOException {
+    Preconditions.checkNotNull(table, "table not set");
+    return writeFile(table, partition, records, fileFormat, tmp.newFile());
+  }
+
+  public static DataFile writeFile(Table table, StructLike partition, 
List<Record> records, FileFormat fileFormat,
+                                   File file) throws IOException {
+    Assert.assertTrue(file.delete());
+
+    FileAppender<Record> appender;
+
+    switch (fileFormat) {
+      case AVRO:
+        appender = Avro.write(Files.localOutput(file))
+                .schema(table.schema())
+                .createWriterFunc(DataWriter::create)
+                .named(fileFormat.name())
+                .build();
+        break;
+
+      case PARQUET:
+        appender = Parquet.write(Files.localOutput(file))
+                .schema(table.schema())
+                .createWriterFunc(GenericParquetWriter::buildWriter)
+                .named(fileFormat.name())
+                .build();
+        break;
+
+      case ORC:
+        appender = ORC.write(Files.localOutput(file))
+                .schema(table.schema())
+                .createWriterFunc(GenericOrcWriter::buildWriter)
+                .build();
+        break;
+
+      default:
+        throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
+    }
+
+    try {
+      appender.addAll(records);
+    } finally {
+      appender.close();
+    }
+
+    DataFiles.Builder builder = DataFiles.builder(table.spec())
+            .withPath(file.toString())
+            .withFormat(fileFormat)
+            .withFileSizeInBytes(file.length())
+            .withMetrics(appender.metrics());
+
+    if (partition != null) {
+      builder.withPartition(partition);
+    }
+
+    return builder.build();
+  }
+
+  public static class RecordsBuilder {
+
+    private final List<Record> records = new ArrayList<Record>();
+    private final Schema schema;
+
+    private RecordsBuilder(Schema schema) {
+      this.schema = schema;
+    }
+
+    public RecordsBuilder add(Object... values) {
+      Preconditions.checkArgument(schema.columns().size() == values.length);
+
+      GenericRecord record = GenericRecord.create(schema);
+
+      for (int i = 0; i < values.length; i++) {
+        record.set(i, values[i]);
+      }
+
+      records.add(record);
+      return this;
+    }
+
+    public List<Record> build() {
+      return Collections.unmodifiableList(records);
+    }
+
+    public static RecordsBuilder newInstance(Schema schema) {
+      return new RecordsBuilder(schema);
+    }
+  }
+}
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java 
b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
new file mode 100644
index 0000000..4470aec
--- /dev/null
+++ b/mr/src/test/java/org/apache/iceberg/mr/TestIcebergInputFormats.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.TestHelpers.Row;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.mapred.Container;
+import org.apache.iceberg.mr.mapred.MapredIcebergInputFormat;
+import org.apache.iceberg.mr.mapreduce.IcebergInputFormat;
+import org.apache.iceberg.mr.mapreduce.IcebergSplit;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+@RunWith(Parameterized.class)
+public class TestIcebergInputFormats {
+
+  private static final List<TestInputFormat.Factory<Record>> 
TESTED_INPUT_FORMATS = ImmutableList.of(
+          TestInputFormat.newFactory("IcebergInputFormat", 
TestIcebergInputFormat::create),
+          TestInputFormat.newFactory("MapredIcebergInputFormat", 
TestMapredIcebergInputFormat::create));
+
+  private static final List<String> TESTED_FILE_FORMATS = 
ImmutableList.of("avro", "orc", "parquet");
+
+  private static final Schema SCHEMA = new Schema(
+          required(1, "data", Types.StringType.get()),
+          required(2, "id", Types.LongType.get()),
+          required(3, "date", Types.StringType.get()));
+
+  private static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
+          .identity("date")
+          .bucket("id", 1)
+          .build();
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  // before variables
+  private Configuration conf;
+  private TestHelper helper;
+  private InputFormatConfig.ConfigBuilder builder;
+
+  // parametrized variables
+  private final TestInputFormat.Factory<Record> testInputFormat;
+  private final FileFormat fileFormat;
+
+  @Before
+  public void before() throws IOException {
+    conf = new Configuration();
+    HadoopTables tables = new HadoopTables(conf);
+
+    File location = temp.newFolder(testInputFormat.name(), fileFormat.name());
+    Assert.assertTrue(location.delete());
+
+    helper = new TestHelper(conf, tables, SCHEMA, SPEC, fileFormat, temp, 
location);
+    builder = new InputFormatConfig.ConfigBuilder(conf).readFrom(location);
+  }
+
+  @Parameterized.Parameters
+  public static Object[][] parameters() {
+    Object[][] parameters = new Object[TESTED_INPUT_FORMATS.size() * 
TESTED_FILE_FORMATS.size()][2];
+
+    int idx = 0;
+
+    for (TestInputFormat.Factory<Record> inputFormat : TESTED_INPUT_FORMATS) {
+      for (String fileFormat : TESTED_FILE_FORMATS) {
+        parameters[idx++] = new Object[] {inputFormat, fileFormat};
+      }
+    }
+
+    return parameters;
+  }
+
+  public TestIcebergInputFormats(TestInputFormat.Factory<Record> 
testInputFormat, String fileFormat) {
+    this.testInputFormat = testInputFormat;
+    this.fileFormat = 
FileFormat.valueOf(fileFormat.toUpperCase(Locale.ENGLISH));
+  }
+
+  @Test
+  public void testUnpartitionedTable() throws Exception {
+    helper.createUnpartitionedTable();
+    List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+    helper.appendToTable(null, expectedRecords);
+
+    testInputFormat.create(builder.conf()).validate(expectedRecords);
+  }
+
+  @Test
+  public void testPartitionedTable() throws Exception {
+    helper.createPartitionedTable();
+    List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+    expectedRecords.get(0).set(2, "2020-03-20");
+    helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords);
+
+    testInputFormat.create(builder.conf()).validate(expectedRecords);
+  }
+
+  @Test
+  public void testFilterExp() throws Exception {
+    helper.createPartitionedTable();
+
+    List<Record> expectedRecords = helper.generateRandomRecords(2, 0L);
+    expectedRecords.get(0).set(2, "2020-03-20");
+    expectedRecords.get(1).set(2, "2020-03-20");
+
+    DataFile dataFile1 = helper.writeFile(Row.of("2020-03-20", 0), 
expectedRecords);
+    DataFile dataFile2 = helper.writeFile(Row.of("2020-03-21", 0), 
helper.generateRandomRecords(2, 0L));
+    helper.appendToTable(dataFile1, dataFile2);
+
+    builder.filter(Expressions.equal("date", "2020-03-20"));
+    testInputFormat.create(builder.conf()).validate(expectedRecords);
+  }
+
+  @Test
+  public void testResiduals() throws Exception {
+    helper.createPartitionedTable();
+
+    List<Record> writeRecords = helper.generateRandomRecords(2, 0L);
+    writeRecords.get(0).set(1, 123L);
+    writeRecords.get(0).set(2, "2020-03-20");
+    writeRecords.get(1).set(1, 456L);
+    writeRecords.get(1).set(2, "2020-03-20");
+
+    List<Record> expectedRecords = new ArrayList<>();
+    expectedRecords.add(writeRecords.get(0));
+
+    DataFile dataFile1 = helper.writeFile(Row.of("2020-03-20", 0), 
writeRecords);
+    DataFile dataFile2 = helper.writeFile(Row.of("2020-03-21", 0), 
helper.generateRandomRecords(2, 0L));
+    helper.appendToTable(dataFile1, dataFile2);
+
+    builder.filter(Expressions.and(
+            Expressions.equal("date", "2020-03-20"),
+            Expressions.equal("id", 123)));
+    testInputFormat.create(builder.conf()).validate(expectedRecords);
+
+    // skip residual filtering
+    builder.skipResidualFiltering();
+    testInputFormat.create(builder.conf()).validate(writeRecords);
+  }
+
+  @Test
+  public void testFailedResidualFiltering() throws Exception {
+    helper.createPartitionedTable();
+
+    List<Record> expectedRecords = helper.generateRandomRecords(2, 0L);
+    expectedRecords.get(0).set(2, "2020-03-20");
+    expectedRecords.get(1).set(2, "2020-03-20");
+
+    helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords);
+
+    builder.useHiveRows()
+           .filter(Expressions.and(
+                   Expressions.equal("date", "2020-03-20"),
+                   Expressions.equal("id", 0)));
+
+    AssertHelpers.assertThrows(
+        "Residuals are not evaluated today for Iceberg Generics In memory 
model of HIVE",
+        UnsupportedOperationException.class, "Filter expression 
ref(name=\"id\") == 0 is not completely satisfied.",
+        () -> testInputFormat.create(builder.conf()));
+
+    builder.usePigTuples();
+
+    AssertHelpers.assertThrows(
+        "Residuals are not evaluated today for Iceberg Generics In memory 
model of PIG",
+        UnsupportedOperationException.class, "Filter expression 
ref(name=\"id\") == 0 is not completely satisfied.",
+        () -> testInputFormat.create(builder.conf()));
+  }
+
+  @Test
+  public void testProjection() throws Exception {
+    helper.createPartitionedTable();
+    List<Record> inputRecords = helper.generateRandomRecords(1, 0L);
+    helper.appendToTable(Row.of("2020-03-20", 0), inputRecords);
+
+    Schema projection = TypeUtil.select(SCHEMA, ImmutableSet.of(1));
+    builder.project(projection);
+
+    List<Record> outputRecords = 
testInputFormat.create(builder.conf()).getRecords();
+
+    Assert.assertEquals(inputRecords.size(), outputRecords.size());
+    Assert.assertEquals(projection.asStruct(), outputRecords.get(0).struct());
+  }
+
+  private static final Schema LOG_SCHEMA = new Schema(
+          Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+          Types.NestedField.optional(2, "date", Types.StringType.get()),
+          Types.NestedField.optional(3, "level", Types.StringType.get()),
+          Types.NestedField.optional(4, "message", Types.StringType.get())
+  );
+
+  private static final PartitionSpec IDENTITY_PARTITION_SPEC =
+          
PartitionSpec.builderFor(LOG_SCHEMA).identity("date").identity("level").build();
+
+  @Test
+  public void testIdentityPartitionProjections() throws Exception {
+    helper.createTable(LOG_SCHEMA, IDENTITY_PARTITION_SPEC);
+    List<Record> inputRecords = helper.generateRandomRecords(10, 0L);
+
+    Integer idx = 0;
+    AppendFiles append = helper.getTable().newAppend();
+    for (Record record : inputRecords) {
+      record.set(1, "2020-03-2" + idx);
+      record.set(2, idx.toString());
+      append.appendFile(helper.writeFile(Row.of("2020-03-2" + idx, 
idx.toString()), ImmutableList.of(record)));
+      idx += 1;
+    }
+    append.commit();
+
+    // individual fields
+    validateIdentityPartitionProjections(withColumns("date"), inputRecords);
+    validateIdentityPartitionProjections(withColumns("level"), inputRecords);
+    validateIdentityPartitionProjections(withColumns("message"), inputRecords);
+    validateIdentityPartitionProjections(withColumns("id"), inputRecords);
+    // field pairs
+    validateIdentityPartitionProjections(withColumns("date", "message"), 
inputRecords);
+    validateIdentityPartitionProjections(withColumns("level", "message"), 
inputRecords);
+    validateIdentityPartitionProjections(withColumns("date", "level"), 
inputRecords);
+    // out-of-order pairs
+    validateIdentityPartitionProjections(withColumns("message", "date"), 
inputRecords);
+    validateIdentityPartitionProjections(withColumns("message", "level"), 
inputRecords);
+    validateIdentityPartitionProjections(withColumns("level", "date"), 
inputRecords);
+    // full projection
+    validateIdentityPartitionProjections(LOG_SCHEMA, inputRecords);
+    // out-of-order triplets
+    validateIdentityPartitionProjections(withColumns("date", "level", 
"message"), inputRecords);
+    validateIdentityPartitionProjections(withColumns("level", "date", 
"message"), inputRecords);
+    validateIdentityPartitionProjections(withColumns("date", "message", 
"level"), inputRecords);
+    validateIdentityPartitionProjections(withColumns("level", "message", 
"date"), inputRecords);
+    validateIdentityPartitionProjections(withColumns("message", "date", 
"level"), inputRecords);
+    validateIdentityPartitionProjections(withColumns("message", "level", 
"date"), inputRecords);
+  }
+
+  private static Schema withColumns(String... names) {
+    Map<String, Integer> indexByName = 
TypeUtil.indexByName(LOG_SCHEMA.asStruct());
+    Set<Integer> projectedIds = Sets.newHashSet();
+    for (String name : names) {
+      projectedIds.add(indexByName.get(name));
+    }
+    return TypeUtil.select(LOG_SCHEMA, projectedIds);
+  }
+
+  private void validateIdentityPartitionProjections(Schema projectedSchema, 
List<Record> inputRecords) {
+    builder.project(projectedSchema);
+    List<Record> actualRecords = 
testInputFormat.create(builder.conf()).getRecords();
+
+    Set<String> fieldNames = 
TypeUtil.indexByName(projectedSchema.asStruct()).keySet();
+
+    for (int pos = 0; pos < inputRecords.size(); pos++) {
+      Record inputRecord = inputRecords.get(pos);
+      Record actualRecord = actualRecords.get(pos);
+      Assert.assertEquals("Projected schema should match", 
projectedSchema.asStruct(), actualRecord.struct());
+
+      for (String name : fieldNames) {
+        Assert.assertEquals(
+                "Projected field " + name + " should match", 
inputRecord.getField(name), actualRecord.getField(name));
+      }
+    }
+  }
+
+  @Test
+  public void testSnapshotReads() throws Exception {
+    helper.createUnpartitionedTable();
+
+    List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+    helper.appendToTable(null, expectedRecords);
+    long snapshotId = helper.getTable().currentSnapshot().snapshotId();
+
+    helper.appendToTable(null, helper.generateRandomRecords(1, 0L));
+
+    builder.snapshotId(snapshotId);
+    testInputFormat.create(builder.conf()).validate(expectedRecords);
+  }
+
+  @Test
+  public void testLocality() throws Exception {
+    helper.createUnpartitionedTable();
+    List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+    helper.appendToTable(null, expectedRecords);
+
+    for (InputSplit split : 
testInputFormat.create(builder.conf()).getSplits()) {
+      Assert.assertArrayEquals(IcebergSplit.ANYWHERE, split.getLocations());
+    }
+
+    builder.preferLocality();
+
+    for (InputSplit split : 
testInputFormat.create(builder.conf()).getSplits()) {
+      Assert.assertArrayEquals(new String[]{"localhost"}, 
split.getLocations());
+    }
+  }
+
+  public static class HadoopCatalogLoader implements Function<Configuration, 
Catalog> {
+    @Override
+    public Catalog apply(Configuration conf) {
+      return new HadoopCatalog(conf, conf.get("warehouse.location"));
+    }
+  }
+
+  @Test
+  public void testCustomCatalog() throws IOException {
+    conf.set("warehouse.location", 
temp.newFolder("hadoop_catalog").getAbsolutePath());
+
+    Catalog catalog = new HadoopCatalogLoader().apply(conf);
+    TableIdentifier identifier = TableIdentifier.of("db", "t");
+    helper.createTable(catalog, identifier);
+
+    List<Record> expectedRecords = helper.generateRandomRecords(1, 0L);
+    expectedRecords.get(0).set(2, "2020-03-20");
+    helper.appendToTable(Row.of("2020-03-20", 0), expectedRecords);
+
+    builder.catalogFunc(HadoopCatalogLoader.class)
+           .readFrom(identifier);
+
+    testInputFormat.create(builder.conf()).validate(expectedRecords);
+  }
+
+  private abstract static class TestInputFormat<T> {
+
+    private final List<IcebergSplit> splits;
+    private final List<T> records;
+
+    private TestInputFormat(List<IcebergSplit> splits, List<T> records) {
+      this.splits = splits;
+      this.records = records;
+    }
+
+    public List<T> getRecords() {
+      return records;
+    }
+
+    public List<IcebergSplit> getSplits() {
+      return splits;
+    }
+
+    public void validate(List<T> expected) {
+      Assert.assertEquals(expected, records);
+    }
+
+    public interface Factory<T> {
+      String name();
+      TestInputFormat<T> create(Configuration conf);
+    }
+
+    public static <T> Factory<T> newFactory(String name, 
Function<Configuration, TestInputFormat<T>> function) {
+      return new Factory<T>() {
+        @Override
+        public String name() {
+          return name;
+        }
+
+        @Override
+        public TestInputFormat<T> create(Configuration conf) {
+          return function.apply(conf);
+        }
+      };
+    }
+  }
+
+  private static final class TestMapredIcebergInputFormat<T> extends 
TestInputFormat<T> {
+
+    private TestMapredIcebergInputFormat(List<IcebergSplit> splits, List<T> 
records) {
+      super(splits, records);
+    }
+
+    private static <T> TestMapredIcebergInputFormat<T> create(Configuration 
conf) {
+      JobConf job = new JobConf(conf);
+      MapredIcebergInputFormat<T> inputFormat = new 
MapredIcebergInputFormat<>();
+
+      try {
+        org.apache.hadoop.mapred.InputSplit[] splits = 
inputFormat.getSplits(job, 1);
+
+        List<IcebergSplit> iceSplits = new ArrayList<>(splits.length);
+        List<T> records = new ArrayList<>();
+
+        for (org.apache.hadoop.mapred.InputSplit split : splits) {
+          iceSplits.add((IcebergSplit) split);
+          org.apache.hadoop.mapred.RecordReader<Void, Container<T>>
+                  reader = inputFormat.getRecordReader(split, job, 
Reporter.NULL);
+
+          try {
+            Container<T> container = reader.createValue();
+
+            while (reader.next(null, container)) {
+              records.add(container.get());
+            }
+          } finally {
+            reader.close();
+          }
+        }
+
+        return new TestMapredIcebergInputFormat<>(iceSplits, records);
+      } catch (IOException ioe) {
+        throw new UncheckedIOException(ioe);
+      }
+    }
+  }
+
+  private static final class TestIcebergInputFormat<T> extends 
TestInputFormat<T> {
+
+    private TestIcebergInputFormat(List<IcebergSplit> splits, List<T> records) 
{
+      super(splits, records);
+    }
+
+    private static <T> TestIcebergInputFormat<T> create(Configuration conf) {
+      TaskAttemptContext context = new TaskAttemptContextImpl(conf, new 
TaskAttemptID());
+      IcebergInputFormat<T> inputFormat = new IcebergInputFormat<>();
+      List<InputSplit> splits = inputFormat.getSplits(context);
+
+      List<IcebergSplit> iceSplits = new ArrayList<>(splits.size());
+      List<T> records = new ArrayList<>();
+
+      for (InputSplit split : splits) {
+        iceSplits.add((IcebergSplit) split);
+
+        try (RecordReader<Void, T> reader = 
inputFormat.createRecordReader(split, context)) {
+          reader.initialize(split, context);
+
+          while (reader.nextKeyValue()) {
+            records.add(reader.getCurrentValue());
+          }
+        } catch (InterruptedException ie) {
+          throw new RuntimeException(ie);
+        } catch (IOException ioe) {
+          throw new UncheckedIOException(ioe);
+        }
+      }
+
+      return new TestIcebergInputFormat<>(iceSplits, records);
+    }
+  }
+}
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java
new file mode 100644
index 0000000..6443d45
--- /dev/null
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergInputFormat.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.StandaloneHiveRunner;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.mr.TestHelper;
+import org.apache.iceberg.mr.mapred.IcebergSerDe;
+import org.apache.iceberg.types.Types;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+
+@RunWith(StandaloneHiveRunner.class)
+public class TestHiveIcebergInputFormat {
+
+  @HiveSQL(files = {}, autoStart = true)
+  private HiveShell shell;
+
+  @Rule
+  public TemporaryFolder temp = new TemporaryFolder();
+
+  private static final Schema CUSTOMER_SCHEMA = new Schema(
+          required(1, "customer_id", Types.LongType.get()),
+          required(2, "first_name", Types.StringType.get())
+  );
+
+  private static final List<Record> CUSTOMER_RECORDS = 
TestHelper.RecordsBuilder.newInstance(CUSTOMER_SCHEMA)
+          .add(0L, "Alice")
+          .add(1L, "Bob")
+          .add(2L, "Trudy")
+          .build();
+
+  private static final Schema ORDER_SCHEMA = new Schema(
+          required(1, "order_id", Types.LongType.get()),
+          required(2, "customer_id", Types.LongType.get()),
+          required(3, "total", Types.DoubleType.get()));
+
+  private static final List<Record> ORDER_RECORDS = 
TestHelper.RecordsBuilder.newInstance(ORDER_SCHEMA)
+          .add(100L, 0L, 11.11d)
+          .add(101L, 0L, 22.22d)
+          .add(102L, 1L, 33.33d)
+          .build();
+
+  // before variables
+  private HadoopTables tables;
+  private Table customerTable;
+  private Table orderTable;
+
+  @Before
+  public void before() throws IOException {
+    Configuration conf = new Configuration();
+    tables = new HadoopTables(conf);
+
+    File customerLocation = temp.newFolder("customers");
+    Assert.assertTrue(customerLocation.delete());
+
+    TestHelper customerHelper = new TestHelper(
+            conf, tables, CUSTOMER_SCHEMA, PartitionSpec.unpartitioned(), 
FileFormat.PARQUET, temp, customerLocation);
+
+    customerTable = customerHelper.createUnpartitionedTable();
+    customerHelper.appendToTable(customerHelper.writeFile(null, 
CUSTOMER_RECORDS));
+
+    File orderLocation = temp.newFolder("orders");
+    Assert.assertTrue(orderLocation.delete());
+
+    TestHelper orderHelper = new TestHelper(
+            conf, tables, ORDER_SCHEMA, PartitionSpec.unpartitioned(), 
FileFormat.PARQUET, temp, orderLocation);
+
+    orderTable = orderHelper.createUnpartitionedTable();
+    orderHelper.appendToTable(orderHelper.writeFile(null, ORDER_RECORDS));
+  }
+
+  @Test
+  public void testScanEmptyTable() throws IOException {
+    File emptyLocation = temp.newFolder("empty");
+    Assert.assertTrue(emptyLocation.delete());
+
+    Schema emptySchema = new Schema(required(1, "empty", 
Types.StringType.get()));
+    Table emptyTable = tables.create(
+            emptySchema, PartitionSpec.unpartitioned(), 
Collections.emptyMap(), emptyLocation.toString());
+    createHiveTable("empty", emptyTable.location());
+
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM 
default.empty");
+    Assert.assertEquals(0, rows.size());
+  }
+
+  @Test
+  public void testScanTable() {
+    createHiveTable("customers", customerTable.location());
+
+    // Single fetch task: no MR job.
+    List<Object[]> rows = shell.executeStatement("SELECT * FROM 
default.customers");
+
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[] {0L, "Alice"}, rows.get(0));
+    Assert.assertArrayEquals(new Object[] {1L, "Bob"}, rows.get(1));
+    Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, rows.get(2));
+
+    // Adding the ORDER BY clause will cause Hive to spawn a local MR job this 
time.
+    List<Object[]> descRows = shell.executeStatement("SELECT * FROM 
default.customers ORDER BY customer_id DESC");
+
+    Assert.assertEquals(3, rows.size());
+    Assert.assertArrayEquals(new Object[] {2L, "Trudy"}, descRows.get(0));
+    Assert.assertArrayEquals(new Object[] {1L, "Bob"}, descRows.get(1));
+    Assert.assertArrayEquals(new Object[] {0L, "Alice"}, descRows.get(2));
+  }
+
+  @Test
+  public void testJoinTables() {
+    createHiveTable("customers", customerTable.location());
+    createHiveTable("orders", orderTable.location());
+
+    List<Object[]> rows = shell.executeStatement(
+            "SELECT c.customer_id, c.first_name, o.order_id, o.total " +
+                    "FROM default.customers c JOIN default.orders o ON 
c.customer_id = o.customer_id " +
+                    "ORDER BY c.customer_id, o.order_id"
+    );
+
+    Assert.assertArrayEquals(new Object[] {0L, "Alice", 100L, 11.11d}, 
rows.get(0));
+    Assert.assertArrayEquals(new Object[] {0L, "Alice", 101L, 22.22d}, 
rows.get(1));
+    Assert.assertArrayEquals(new Object[] {1L, "Bob", 102L, 33.33d}, 
rows.get(2));
+  }
+
+  private void createHiveTable(String table, String location) {
+    shell.execute(String.format(
+            "CREATE TABLE default.%s " +
+            "ROW FORMAT SERDE '%s' " +
+            "STORED AS " +
+                "INPUTFORMAT '%s' " +
+                "OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat' " +
+            "LOCATION '%s'",
+            table, IcebergSerDe.class.getName(), 
HiveIcebergInputFormat.class.getName(), location));
+  }
+}
diff --git a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java 
b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java
deleted file mode 100644
index 0d8cdc0..0000000
--- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestHelpers.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.iceberg.mr.mapred;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.avro.DataWriter;
-import org.apache.iceberg.data.orc.GenericOrcWriter;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.types.Types;
-
-import static org.apache.iceberg.types.Types.NestedField.optional;
-import static org.apache.iceberg.types.Types.NestedField.required;
-
-public class TestHelpers {
-
-  private TestHelpers() {}
-
-  public static DataFile writeFile(File targetFile, Table table, StructLike 
partitionData, FileFormat fileFormat,
-                                   List<Record> records) throws IOException {
-    if (targetFile.exists()) {
-      if (!targetFile.delete()) {
-        throw new IOException("Unable to delete " + 
targetFile.getAbsolutePath());
-      }
-    }
-    FileAppender<Record> appender;
-    switch (fileFormat) {
-      case AVRO:
-        appender = Avro.write(Files.localOutput(targetFile))
-            .schema(table.schema())
-            .createWriterFunc(DataWriter::create)
-            .named(fileFormat.name())
-            .build();
-        break;
-      case PARQUET:
-        appender = Parquet.write(Files.localOutput(targetFile))
-            .schema(table.schema())
-            .createWriterFunc(GenericParquetWriter::buildWriter)
-            .named(fileFormat.name())
-            .build();
-        break;
-      case ORC:
-        appender = ORC.write(Files.localOutput(targetFile))
-            .schema(table.schema())
-            .createWriterFunc(GenericOrcWriter::buildWriter)
-            .build();
-        break;
-      default:
-        throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
-    }
-
-    try {
-      appender.addAll(records);
-    } finally {
-      appender.close();
-    }
-
-    DataFiles.Builder builder = DataFiles.builder(table.spec())
-        .withPath(targetFile.toString())
-        .withFormat(fileFormat)
-        .withFileSizeInBytes(targetFile.length())
-        .withMetrics(appender.metrics());
-    if (partitionData != null) {
-      builder.withPartition(partitionData);
-    }
-    return builder.build();
-  }
-
-  public static Record createSimpleRecord(long id, String data) {
-    Schema schema = new Schema(required(1, "id", Types.StringType.get()),
-        optional(2, "data", Types.LongType.get()));
-    GenericRecord record = GenericRecord.create(schema);
-    record.setField("id", id);
-    record.setField("data", data);
-    return record;
-  }
-
-  public static Record createCustomRecord(Schema schema, Object... dataValues) 
{
-    GenericRecord record = GenericRecord.create(schema);
-    List<Types.NestedField> fields = schema.columns();
-    for (int i = 0; i < fields.size(); i++) {
-      record.set(i, dataValues[i]);
-    }
-    return record;
-  }
-
-}
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java 
b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
index 937ddba..a2fb90d 100644
--- a/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
+++ b/mr/src/test/java/org/apache/iceberg/mr/mapred/TestIcebergSerDe.java
@@ -72,9 +72,10 @@ public class TestIcebergSerDe {
     IcebergSerDe serDe = new IcebergSerDe();
 
     Record record = RandomGenericData.generate(schema, 1, 0).get(0);
-    IcebergWritable writable = new IcebergWritable(record, schema);
+    Container<Record> container = new Container<>();
+    container.set(record);
 
-    Assert.assertEquals(record, serDe.deserialize(writable));
+    Assert.assertEquals(record, serDe.deserialize(container));
   }
 
 }
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java 
b/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
deleted file mode 100644
index 05842f8..0000000
--- 
a/mr/src/test/java/org/apache/iceberg/mr/mapreduce/TestIcebergInputFormat.java
+++ /dev/null
@@ -1,557 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.mr.mapreduce;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.function.Function;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.iceberg.AppendFiles;
-import org.apache.iceberg.AssertHelpers;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.StructLike;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.TestHelpers.Row;
-import org.apache.iceberg.avro.Avro;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.data.RandomGenericData;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.data.avro.DataWriter;
-import org.apache.iceberg.data.orc.GenericOrcWriter;
-import org.apache.iceberg.data.parquet.GenericParquetWriter;
-import org.apache.iceberg.expressions.Expressions;
-import org.apache.iceberg.hadoop.HadoopCatalog;
-import org.apache.iceberg.hadoop.HadoopTables;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.mr.InputFormatConfig;
-import org.apache.iceberg.orc.ORC;
-import org.apache.iceberg.parquet.Parquet;
-import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
-import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.types.Types;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import static org.apache.iceberg.types.Types.NestedField.required;
-
-@RunWith(Parameterized.class)
-public class TestIcebergInputFormat {
-  static final Schema SCHEMA = new Schema(
-      required(1, "data", Types.StringType.get()),
-      required(2, "id", Types.LongType.get()),
-      required(3, "date", Types.StringType.get()));
-
-  static final PartitionSpec SPEC = PartitionSpec.builderFor(SCHEMA)
-      .identity("date")
-      .bucket("id", 1)
-      .build();
-
-  @Rule
-  public TemporaryFolder temp = new TemporaryFolder();
-  private HadoopTables tables;
-  private Configuration conf;
-
-  @Parameterized.Parameters
-  public static Object[][] parameters() {
-    return new Object[][]{
-        new Object[]{"parquet"},
-        new Object[]{"avro"},
-        new Object[]{"orc"}
-    };
-  }
-
-  private final FileFormat format;
-
-  public TestIcebergInputFormat(String format) {
-    this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
-  }
-
-  @Before
-  public void before() {
-    conf = new Configuration();
-    tables = new HadoopTables(conf);
-  }
-
-  @Test
-  public void testUnpartitionedTable() throws Exception {
-    File location = temp.newFolder(format.name());
-    Assert.assertTrue(location.delete());
-    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
-                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
-                                location.toString());
-    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
-    DataFile dataFile = writeFile(table, null, format, expectedRecords);
-    table.newAppend()
-         .appendFile(dataFile)
-         .commit();
-    Job job = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
-    configBuilder.readFrom(location.toString()).schema(table.schema());
-    validate(job, expectedRecords);
-  }
-
-  @Test
-  public void testPartitionedTable() throws Exception {
-    File location = temp.newFolder(format.name());
-    Assert.assertTrue(location.delete());
-    Table table = tables.create(SCHEMA, SPEC,
-                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
-                                location.toString());
-    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
-    expectedRecords.get(0).set(2, "2020-03-20");
-    DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, 
expectedRecords);
-    table.newAppend()
-         .appendFile(dataFile)
-         .commit();
-
-    Job job = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
-    configBuilder.readFrom(location.toString()).schema(table.schema());
-    validate(job, expectedRecords);
-  }
-
-  @Test
-  public void testFilterExp() throws Exception {
-    File location = temp.newFolder(format.name());
-    Assert.assertTrue(location.delete());
-    Table table = tables.create(SCHEMA, SPEC,
-                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
-                                location.toString());
-    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
2, 0L);
-    expectedRecords.get(0).set(2, "2020-03-20");
-    expectedRecords.get(1).set(2, "2020-03-20");
-    DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, 
expectedRecords);
-    DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format,
-                                   RandomGenericData.generate(table.schema(), 
2, 0L));
-    table.newAppend()
-         .appendFile(dataFile1)
-         .appendFile(dataFile2)
-         .commit();
-    Job job = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
-    configBuilder.readFrom(location.toString())
-            .schema(table.schema())
-            .filter(Expressions.equal("date", "2020-03-20"));
-    validate(job, expectedRecords);
-  }
-
-  @Test
-  public void testResiduals() throws Exception {
-    File location = temp.newFolder(format.name());
-    Assert.assertTrue(location.delete());
-    Table table = tables.create(SCHEMA, SPEC,
-                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
-                                location.toString());
-    List<Record> writeRecords = RandomGenericData.generate(table.schema(), 2, 
0L);
-    writeRecords.get(0).set(1, 123L);
-    writeRecords.get(0).set(2, "2020-03-20");
-    writeRecords.get(1).set(1, 456L);
-    writeRecords.get(1).set(2, "2020-03-20");
-
-    List<Record> expectedRecords = new ArrayList<>();
-    expectedRecords.add(writeRecords.get(0));
-
-    DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, 
writeRecords);
-    DataFile dataFile2 = writeFile(table, Row.of("2020-03-21", 0), format,
-        RandomGenericData.generate(table.schema(), 2, 0L));
-    table.newAppend()
-         .appendFile(dataFile1)
-         .appendFile(dataFile2)
-         .commit();
-    Job job = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
-    configBuilder.readFrom(location.toString())
-            .schema(table.schema())
-            .filter(Expressions.and(
-                    Expressions.equal("date", "2020-03-20"),
-                    Expressions.equal("id", 123)));
-    validate(job, expectedRecords);
-
-    // skip residual filtering
-    job = Job.getInstance(conf);
-    configBuilder = IcebergInputFormat.configure(job);
-    configBuilder.skipResidualFiltering().readFrom(location.toString())
-            .schema(table.schema())
-            .filter(Expressions.and(
-                    Expressions.equal("date", "2020-03-20"),
-                    Expressions.equal("id", 123)));
-    validate(job, writeRecords);
-  }
-
-  @Test
-  public void testFailedResidualFiltering() throws Exception {
-    File location = temp.newFolder(format.name());
-    Assert.assertTrue(location.delete());
-    Table table = tables.create(SCHEMA, SPEC,
-        ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
-        location.toString());
-    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
2, 0L);
-    expectedRecords.get(0).set(2, "2020-03-20");
-    expectedRecords.get(1).set(2, "2020-03-20");
-
-    DataFile dataFile1 = writeFile(table, Row.of("2020-03-20", 0), format, 
expectedRecords);
-    table.newAppend()
-        .appendFile(dataFile1)
-        .commit();
-
-    Job jobShouldFail1 = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(jobShouldFail1);
-    configBuilder.useHiveRows().readFrom(location.toString())
-        .schema(table.schema())
-        .filter(Expressions.and(
-            Expressions.equal("date", "2020-03-20"),
-            Expressions.equal("id", 0)));
-    AssertHelpers.assertThrows(
-        "Residuals are not evaluated today for Iceberg Generics In memory 
model of HIVE",
-        UnsupportedOperationException.class, "Filter expression 
ref(name=\"id\") == 0 is not completely satisfied.",
-        () -> validate(jobShouldFail1, expectedRecords));
-
-    Job jobShouldFail2 = Job.getInstance(conf);
-    configBuilder = IcebergInputFormat.configure(jobShouldFail2);
-    configBuilder.usePigTuples().readFrom(location.toString())
-        .schema(table.schema())
-        .filter(Expressions.and(
-            Expressions.equal("date", "2020-03-20"),
-            Expressions.equal("id", 0)));
-    AssertHelpers.assertThrows(
-        "Residuals are not evaluated today for Iceberg Generics In memory 
model of PIG",
-        UnsupportedOperationException.class, "Filter expression 
ref(name=\"id\") == 0 is not completely satisfied.",
-        () -> validate(jobShouldFail2, expectedRecords));
-  }
-
-  @Test
-  public void testProjection() throws Exception {
-    File location = temp.newFolder(format.name());
-    Assert.assertTrue(location.delete());
-    Schema projectedSchema = TypeUtil.select(SCHEMA, ImmutableSet.of(1));
-    Table table = tables.create(SCHEMA, SPEC,
-                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
-                                location.toString());
-    List<Record> inputRecords = RandomGenericData.generate(table.schema(), 1, 
0L);
-    DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, 
inputRecords);
-    table.newAppend()
-         .appendFile(dataFile)
-         .commit();
-
-    Job job = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
-    configBuilder
-        .schema(table.schema())
-        .readFrom(location.toString())
-        .project(projectedSchema);
-    List<Record> outputRecords = readRecords(job.getConfiguration());
-    Assert.assertEquals(inputRecords.size(), outputRecords.size());
-    Assert.assertEquals(projectedSchema.asStruct(), 
outputRecords.get(0).struct());
-  }
-
-  private static final Schema LOG_SCHEMA = new Schema(
-      Types.NestedField.optional(1, "id", Types.IntegerType.get()),
-      Types.NestedField.optional(2, "date", Types.StringType.get()),
-      Types.NestedField.optional(3, "level", Types.StringType.get()),
-      Types.NestedField.optional(4, "message", Types.StringType.get())
-  );
-
-  private static final PartitionSpec IDENTITY_PARTITION_SPEC =
-      
PartitionSpec.builderFor(LOG_SCHEMA).identity("date").identity("level").build();
-
-  @Test
-  public void testIdentityPartitionProjections() throws Exception {
-    File location = temp.newFolder(format.name());
-    Assert.assertTrue(location.delete());
-    Table table = tables.create(LOG_SCHEMA, IDENTITY_PARTITION_SPEC,
-                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
-                                location.toString());
-
-    List<Record> inputRecords = RandomGenericData.generate(LOG_SCHEMA, 10, 0);
-    Integer idx = 0;
-    AppendFiles append = table.newAppend();
-    for (Record record : inputRecords) {
-      record.set(1, "2020-03-2" + idx);
-      record.set(2, idx.toString());
-      append.appendFile(writeFile(table, Row.of("2020-03-2" + idx, 
idx.toString()), format, ImmutableList.of(record)));
-      idx += 1;
-    }
-    append.commit();
-
-    // individual fields
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("date"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("level"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("message"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("id"), inputRecords);
-    // field pairs
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("date", "message"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("level", "message"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("date", "level"), inputRecords);
-    // out-of-order pairs
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("message", "date"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("message", "level"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("level", "date"), inputRecords);
-    // full projection
-    validateIdentityPartitionProjections(location.toString(), table.schema(), 
LOG_SCHEMA, inputRecords);
-    // out-of-order triplets
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("date", "level", "message"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("level", "date", "message"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("date", "message", "level"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("level", "message", "date"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("message", "date", "level"), inputRecords);
-    validateIdentityPartitionProjections(location.toString(), table.schema(),
-            withColumns("message", "level", "date"), inputRecords);
-  }
-
-  private static Schema withColumns(String... names) {
-    Map<String, Integer> indexByName = 
TypeUtil.indexByName(LOG_SCHEMA.asStruct());
-    Set<Integer> projectedIds = Sets.newHashSet();
-    for (String name : names) {
-      projectedIds.add(indexByName.get(name));
-    }
-    return TypeUtil.select(LOG_SCHEMA, projectedIds);
-  }
-
-  private void validateIdentityPartitionProjections(
-      String tablePath, Schema tableSchema, Schema projectedSchema, 
List<Record> inputRecords) throws Exception {
-    Job job = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
-    configBuilder
-        .schema(tableSchema)
-        .readFrom(tablePath)
-        .project(projectedSchema);
-    List<Record> actualRecords = readRecords(job.getConfiguration());
-
-    Set<String> fieldNames = 
TypeUtil.indexByName(projectedSchema.asStruct()).keySet();
-    for (int pos = 0; pos < inputRecords.size(); pos++) {
-      Record inputRecord = inputRecords.get(pos);
-      Record actualRecord = actualRecords.get(pos);
-      Assert.assertEquals("Projected schema should match", 
projectedSchema.asStruct(), actualRecord.struct());
-      for (String name : fieldNames) {
-        Assert.assertEquals(
-            "Projected field " + name + " should match", 
inputRecord.getField(name), actualRecord.getField(name));
-      }
-    }
-  }
-
-  @Test
-  public void testSnapshotReads() throws Exception {
-    File location = temp.newFolder(format.name());
-    Assert.assertTrue(location.delete());
-    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
-                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
-                                location.toString());
-    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
-    table.newAppend()
-         .appendFile(writeFile(table, null, format, expectedRecords))
-         .commit();
-    long snapshotId = table.currentSnapshot().snapshotId();
-    table.newAppend()
-         .appendFile(writeFile(table, null, format, 
RandomGenericData.generate(table.schema(), 1, 0L)))
-         .commit();
-
-    Job job = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
-    configBuilder
-        .schema(table.schema())
-        .readFrom(location.toString())
-        .snapshotId(snapshotId);
-
-    validate(job, expectedRecords);
-  }
-
-  @Test
-  public void testLocality() throws Exception {
-    File location = temp.newFolder(format.name());
-    Assert.assertTrue(location.delete());
-    Table table = tables.create(SCHEMA, PartitionSpec.unpartitioned(),
-                                
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()),
-                                location.toString());
-    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
-    table.newAppend()
-         .appendFile(writeFile(table, null, format, expectedRecords))
-         .commit();
-    Job job = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
-    configBuilder.readFrom(location.toString()).schema(table.schema());
-
-    for (InputSplit split : splits(job.getConfiguration())) {
-      Assert.assertArrayEquals(IcebergInputFormat.IcebergSplit.ANYWHERE, 
split.getLocations());
-    }
-
-    configBuilder.preferLocality();
-    for (InputSplit split : splits(job.getConfiguration())) {
-      Assert.assertArrayEquals(new String[]{"localhost"}, 
split.getLocations());
-    }
-  }
-
-  public static class HadoopCatalogFunc implements Function<Configuration, 
Catalog> {
-    @Override
-    public Catalog apply(Configuration conf) {
-      return new HadoopCatalog(conf, conf.get("warehouse.location"));
-    }
-  }
-
-  @Test
-  public void testCustomCatalog() throws Exception {
-    conf = new Configuration();
-    conf.set("warehouse.location", 
temp.newFolder("hadoop_catalog").getAbsolutePath());
-
-    Catalog catalog = new HadoopCatalogFunc().apply(conf);
-    TableIdentifier tableIdentifier = TableIdentifier.of("db", "t");
-    Table table = catalog.createTable(tableIdentifier, SCHEMA, SPEC,
-                                      
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));
-    List<Record> expectedRecords = RandomGenericData.generate(table.schema(), 
1, 0L);
-    expectedRecords.get(0).set(2, "2020-03-20");
-    DataFile dataFile = writeFile(table, Row.of("2020-03-20", 0), format, 
expectedRecords);
-    table.newAppend()
-         .appendFile(dataFile)
-         .commit();
-
-    Job job = Job.getInstance(conf);
-    InputFormatConfig.ConfigBuilder configBuilder = 
IcebergInputFormat.configure(job);
-    configBuilder
-        .catalogFunc(HadoopCatalogFunc.class)
-        .schema(table.schema())
-        .readFrom(tableIdentifier.toString());
-    validate(job, expectedRecords);
-  }
-
-  private static void validate(Job job, List<Record> expectedRecords) {
-    List<Record> actualRecords = readRecords(job.getConfiguration());
-    Assert.assertEquals(expectedRecords, actualRecords);
-  }
-
-  private static <T> List<InputSplit> splits(Configuration conf) {
-    TaskAttemptContext context = new TaskAttemptContextImpl(conf, new 
TaskAttemptID());
-    IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();
-    return icebergInputFormat.getSplits(context);
-  }
-
-  private static <T> List<T> readRecords(Configuration conf) {
-    TaskAttemptContext context = new TaskAttemptContextImpl(conf, new 
TaskAttemptID());
-    IcebergInputFormat<T> icebergInputFormat = new IcebergInputFormat<>();
-    List<InputSplit> splits = icebergInputFormat.getSplits(context);
-    return
-        FluentIterable
-            .from(splits)
-            .transformAndConcat(split -> readRecords(icebergInputFormat, 
split, context))
-            .toList();
-  }
-
-  private static <T> Iterable<T> readRecords(
-      IcebergInputFormat<T> inputFormat, InputSplit split, TaskAttemptContext 
context) {
-    RecordReader<Void, T> recordReader = inputFormat.createRecordReader(split, 
context);
-    List<T> records = new ArrayList<>();
-    try {
-      recordReader.initialize(split, context);
-      while (recordReader.nextKeyValue()) {
-        records.add(recordReader.getCurrentValue());
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    return records;
-  }
-
-  private DataFile writeFile(
-      Table table, StructLike partitionData, FileFormat fileFormat, 
List<Record> records) throws IOException {
-    File file = temp.newFile();
-    Assert.assertTrue(file.delete());
-    FileAppender<Record> appender;
-    switch (fileFormat) {
-      case AVRO:
-        appender = Avro.write(Files.localOutput(file))
-            .schema(table.schema())
-            .createWriterFunc(DataWriter::create)
-            .named(fileFormat.name())
-            .build();
-        break;
-      case PARQUET:
-        appender = Parquet.write(Files.localOutput(file))
-            .schema(table.schema())
-            .createWriterFunc(GenericParquetWriter::buildWriter)
-            .named(fileFormat.name())
-            .build();
-        break;
-      case ORC:
-        appender = ORC.write(Files.localOutput(file))
-            .schema(table.schema())
-            .createWriterFunc(GenericOrcWriter::buildWriter)
-            .build();
-        break;
-      default:
-        throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
-    }
-
-    try {
-      appender.addAll(records);
-    } finally {
-      appender.close();
-    }
-
-    DataFiles.Builder builder = DataFiles.builder(table.spec())
-        .withPath(file.toString())
-        .withFormat(format)
-        .withFileSizeInBytes(file.length())
-        .withMetrics(appender.metrics());
-    if (partitionData != null) {
-      builder.withPartition(partitionData);
-    }
-    return builder.build();
-  }
-}

Reply via email to