pvary commented on a change in pull request #1407:
URL: https://github.com/apache/iceberg/pull/1407#discussion_r486825641



##########
File path: mr/src/main/java/org/apache/iceberg/mr/HiveSerDeConfig.java
##########
@@ -25,9 +25,9 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
 
-public class InputFormatConfig {
+public final class HiveSerDeConfig {

Review comment:
       Added TODO message for now.
   Closing this conversation.

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,408 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergWritable;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat implements OutputFormat<NullWritable, 
IcebergWritable>,
+    HiveOutputFormat<NullWritable, IcebergWritable> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergOutputFormat.class);
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+  private static final String COMMITTED_EXTENSION = ".committed";
+
+  // <TaskAttemptId, ClosedFileData> map to store the data needed to create 
DataFiles
+  // Stored in concurrent map, since some executor engines can share containers
+  private static final Map<String, ClosedFileData> fileData = new 
ConcurrentHashMap<>();
+
+  private Configuration overlayedConf = null;
+  private String taskAttemptId = null;
+  private Schema schema = null;
+  private String location = null;
+  private FileFormat fileFormat = null;
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path 
finalOutPath,
+                                                           Class valueClass, 
boolean isCompressed,
+                                                           Properties 
tableAndSerDeProperties,
+                                                           Progressable 
progress)
+      throws IOException {
+    this.overlayedConf = createOverlayedConf(jc, tableAndSerDeProperties);
+    this.taskAttemptId = overlayedConf.get(TASK_ATTEMPT_ID_KEY);
+    this.schema = 
SchemaParser.fromJson(overlayedConf.get(InputFormatConfig.TABLE_SCHEMA));
+    this.location = generateTaskLocation(overlayedConf);
+    this.fileFormat = 
FileFormat.valueOf(overlayedConf.get(InputFormatConfig.WRITE_FILE_FORMAT).toUpperCase());
+    fileData.remove(this.taskAttemptId);
+    return new IcebergRecordWriter();
+  }
+
+  /**
+   * Returns the union of the configuration and table properties with the
+   * table properties taking precedence.
+   */
+  private static Configuration createOverlayedConf(Configuration conf, 
Properties tblProps) {
+    Configuration newConf = new Configuration(conf);
+    for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
+      newConf.set((String) prop.getKey(), (String) prop.getValue());
+    }
+    return newConf;
+  }
+
+  /**
+   * Generates query directory location based on the configuration.
+   * Currently it uses tableLocation/queryId
+   * @param conf The job's configuration
+   * @return The directory to store the query result files
+   */
+  private static String generateQueryLocation(Configuration conf) {
+    String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+    String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+    return tableLocation + "/" + queryId;
+  }
+
+  /**
+   * Generates file location based on the task configuration.
+   * Currently it uses QUERY_LOCATION/taskAttemptId.
+   * @param conf The job's configuration
+   * @return The file to store the results
+   */
+  private static String generateTaskLocation(Configuration conf) {
+    String taskAttemptId = conf.get(TASK_ATTEMPT_ID_KEY);
+    return generateQueryLocation(conf) + "/" + taskAttemptId;
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf 
job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException {
+    return new IcebergRecordWriter();
   }
 
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+    // Not doing any check.
+  }
+
+  protected class IcebergRecordWriter extends 
org.apache.hadoop.mapreduce.RecordWriter<NullWritable, IcebergWritable>
+          implements FileSinkOperator.RecordWriter,
+          org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
{
+
+    private final FileAppender<Record> appender;
+    private final FileIO io;
+
+    IcebergRecordWriter() throws IOException {
+      io = new HadoopFileIO(overlayedConf);
+      OutputFile dataFile = io.newOutputFile(location);
+
+      switch (fileFormat) {
+        case AVRO:
+          this.appender = Avro.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(DataWriter::create)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case PARQUET:
+          this.appender = Parquet.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericParquetWriter::buildWriter)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case ORC:
+          this.appender = ORC.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericOrcWriter::buildWriter)
+              .build();
+          break;
+
+        default:
+          throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
+      }
+      LOG.info("IcebergRecordWriter is created in {} with {}", location, 
fileFormat);
+    }
+
+    @Override
+    public void write(Writable row) {
+      Preconditions.checkArgument(row instanceof IcebergWritable);
+
+      // TODO partition handling
+      appender.add(((IcebergWritable) row).record());
+    }
+
+    @Override
+    public void write(NullWritable key, IcebergWritable value) {
+      write(value);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      appender.close();
+      if (!abort) {
+        fileData.put(taskAttemptId, new ClosedFileData(location, fileFormat, 
appender.length(), appender.metrics()));
+      }
+    }
+
+    @Override
+    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) 
throws IOException {
+      close(false);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      close(false);
+    }
+  }
+
+  /**
+   * A dummy committer - not related to the Hive transactions.
+   */
+  public static final class IcebergOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // do nothing.
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return true;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+      ClosedFileData closedFileData = 
fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+      createCommittedFileFor(new 
HadoopFileIO(taskAttemptContext.getJobConf()), closedFileData);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) {
+      fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      Configuration conf = jobContext.getJobConf();
+      Path queryResultPath = new Path(generateQueryLocation(conf));
+      Table table = Catalogs.loadTable(conf);
+
+      ExecutorService executor = null;
+      try {
+        // Creating executor service for parallel handling of file reads and 
deletes
+        executor = Executors.newFixedThreadPool(
+            conf.getInt(InputFormatConfig.WRITE_THREAD_POOL_SIZE, 
InputFormatConfig.WRITE_THREAD_POOL_SIZE_DEFAULT),
+            new ThreadFactoryBuilder()
+                .setDaemon(false)
+                .setPriority(Thread.NORM_PRIORITY)
+                .setNameFormat("iceberg-commit-pool-%d")
+                .build());
+
+        Set<String> taskTmpFiles = new HashSet<>();
+        Set<Future<DataFile>> dataFiles = new HashSet<>();
+        FileSystem fs = Util.getFs(queryResultPath, conf);
+
+        // Listing the task result directory and reading .committed files
+        RemoteIterator<LocatedFileStatus> taskFileStatuses = 
fs.listFiles(queryResultPath, false);
+        while (taskFileStatuses.hasNext()) {
+          LocatedFileStatus taskFile = taskFileStatuses.next();
+          String taskFileName = queryResultPath + "/" + 
taskFile.getPath().getName();
+          taskTmpFiles.add(taskFileName);
+          if (taskFileName.endsWith(COMMITTED_EXTENSION)) {
+            dataFiles.add(executor.submit(() -> {
+              LOG.debug("Reading committed file {}", taskFileName);
+              ClosedFileData cfd = readCommittedFile(table.io(), taskFileName);
+              DataFiles.Builder builder = 
DataFiles.builder(PartitionSpec.unpartitioned())
+                  .withPath(cfd.fileName)
+                  .withFormat(cfd.fileFormat)
+                  .withFileSizeInBytes(cfd.length)
+                  .withMetrics(cfd.serializableMetrics.metrics());
+              DataFile dataFile = builder.build();
+              return dataFile;
+            }));
+          }
+        }
+
+        // Appending data files to the table
+        AppendFiles append = table.newAppend();

Review comment:
       The current patch is only aiming to insert new data to the table (no 
delete/update etc at the moment).
   @rdblue: Is it enough to add data with newAppend, or we have to think about 
overwrite/replacePartitions too?
   Thanks,
   Peter

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,445 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergWritable;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat implements OutputFormat<NullWritable, 
IcebergWritable>,
+    HiveOutputFormat<NullWritable, IcebergWritable> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergOutputFormat.class);
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+  private static final String COMMITTED_EXTENSION = ".committed";
+
+  // <TaskAttemptId, ClosedFileData> map to store the data needed to create 
DataFiles
+  // Stored in concurrent map, since some executor engines can share containers
+  private static final Map<TaskAttemptID, ClosedFileData> fileData = new 
ConcurrentHashMap<>();
+
+  private Configuration overlayedConf = null;
+  private TaskAttemptID taskAttemptId = null;
+  private Schema schema = null;
+  private FileFormat fileFormat = null;
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path 
finalOutPath, Class valueClass,
+      boolean isCompressed, Properties tableAndSerDeProperties, Progressable 
progress) throws IOException {
+
+    this.overlayedConf = createOverlayedConf(jc, tableAndSerDeProperties);
+    this.taskAttemptId = 
TaskAttemptID.forName(overlayedConf.get(TASK_ATTEMPT_ID_KEY));
+    this.schema = 
SchemaParser.fromJson(overlayedConf.get(InputFormatConfig.TABLE_SCHEMA));
+    this.fileFormat = 
FileFormat.valueOf(overlayedConf.get(InputFormatConfig.WRITE_FILE_FORMAT).toUpperCase());
+
+    fileData.remove(this.taskAttemptId);
+
+    return new IcebergRecordWriter(generateDataFileLocation(overlayedConf, 
taskAttemptId));
+  }
+
+  /**
+   * Returns the union of the configuration and table properties with the
+   * table properties taking precedence.
+   */
+  private static Configuration createOverlayedConf(Configuration conf, 
Properties tblProps) {
+    Configuration newConf = new Configuration(conf);
+    for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
+      newConf.set((String) prop.getKey(), (String) prop.getValue());
+    }
+    return newConf;
+  }
+
+  /**
+   * Generates query directory location based on the configuration.
+   * Currently it uses tableLocation/queryId
+   * @param conf The job's configuration
+   * @return The directory to store the query result files
+   */
+  private static String generateQueryLocation(Configuration conf) {
+    String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+    String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+    return tableLocation + "/" + queryId;
+  }
+
+  /**
+   * Generates datafile location based on the task configuration.
+   * Currently it uses QUERY_LOCATION/jobId/taskAttemptId.
+   * @param conf The job's configuration
+   * @param taskAttemptId The TaskAttemptID for the task
+   * @return The file to store the results
+   */
+  private static String generateDataFileLocation(Configuration conf, 
TaskAttemptID taskAttemptId) {
+    return generateQueryLocation(conf) + "/" + taskAttemptId.getJobID() + "/" 
+ taskAttemptId.toString();
+  }
+
+  /**
+   * Generates commit file location based on the task configuration and a 
specific task id.
+   * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks].committed.
+   * @param conf The job's configuration
+   * @param jobId The jobId for the task
+   * @param taskId The taskId for the commit file
+   * @return The file to store the results
+   */
+  private static String generateCommitFileLocation(Configuration conf, JobID 
jobId, int taskId) {
+    return generateQueryLocation(conf) + "/" + jobId + "/task-" + taskId + 
COMMITTED_EXTENSION;
+  }
+
+  /**
+   * Generates commit file location based on the task configuration.
+   * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks].committed.
+   * @param conf The job's configuration
+   * @param taskAttemptId The TaskAttemptID for the task
+   * @return The file to store the results
+   */
+  private static String generateCommitFileLocation(Configuration conf, 
TaskAttemptID taskAttemptId) {
+    return generateCommitFileLocation(conf, taskAttemptId.getJobID(), 
taskAttemptId.getTaskID().getId());
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf 
job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
getRecordWriter(FileSystem ignored,
+      JobConf job, String name, Progressable progress) {
+
+    throw new UnsupportedOperationException("Please implement if needed");
   }
 
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+    // Not doing any check.
+  }
+
+  protected class IcebergRecordWriter extends 
org.apache.hadoop.mapreduce.RecordWriter<NullWritable, IcebergWritable>
+      implements FileSinkOperator.RecordWriter, 
org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> {
+
+    private final String location;
+    private final FileAppender<Record> appender;
+    private final FileIO io;
+
+    IcebergRecordWriter(String location) throws IOException {
+      this.location = location;
+      io = new HadoopFileIO(overlayedConf);
+      OutputFile dataFile = io.newOutputFile(location);
+
+      switch (fileFormat) {
+        case AVRO:
+          this.appender = Avro.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(DataWriter::create)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case PARQUET:
+          this.appender = Parquet.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericParquetWriter::buildWriter)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case ORC:
+          this.appender = ORC.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericOrcWriter::buildWriter)
+              .build();
+          break;
+
+        default:
+          throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
+      }
+      LOG.info("IcebergRecordWriter is created in {} with {}", location, 
fileFormat);
+    }
+
+    @Override
+    public void write(Writable row) {
+      Preconditions.checkArgument(row instanceof IcebergWritable);
+
+      // TODO partition handling
+      appender.add(((IcebergWritable) row).record());
+    }
+
+    @Override
+    public void write(NullWritable key, IcebergWritable value) {
+      write(value);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      appender.close();
+      if (!abort) {
+        fileData.put(taskAttemptId, new ClosedFileData(location, fileFormat, 
appender.length(), appender.metrics()));
+      }
+    }
+
+    @Override
+    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) 
throws IOException {
+      close(false);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      close(false);
+    }
+  }
+
+  /**
+   * A dummy committer - not related to the Hive transactions.
+   */
+  public static final class IcebergOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // do nothing.
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return true;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+      ClosedFileData closedFileData = 
fileData.remove(context.getTaskAttemptID());
+      String commitFileLocation = 
generateCommitFileLocation(context.getJobConf(), context.getTaskAttemptID());
+
+      // If we created a new data file, then create the committed file for this
+      if (closedFileData != null) {
+        createCommittedFileFor(new HadoopFileIO(context.getJobConf()), 
closedFileData, commitFileLocation);
+      } else {
+        TaskType taskType = 
context.getTaskAttemptID().getTaskID().getTaskType();
+        boolean isWrite = 
context.getJobConf().getBoolean(HiveIcebergStorageHandler.WRITE_KEY, false);
+        boolean mapOnly = context.getJobConf().getNumReduceTasks() == 0;
+
+        // If we writing and the task is either reducer or a map in a map-only 
job then write an empty commit file
+        if (isWrite && (TaskType.REDUCE.equals(taskType) || 
(TaskType.MAP.equals(taskType) && mapOnly))) {
+          createCommittedFileFor(new HadoopFileIO(context.getJobConf()), new 
ClosedFileData(), commitFileLocation);
+        }
+      }
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) {
+      FileIO io = new HadoopFileIO(context.getJobConf());
+
+      // Clean up local cache for metadata
+      fileData.remove(context.getTaskAttemptID());
+
+      // Remove the result file for the failed task

Review comment:
       Added the abortTask handling

##########
File path: mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
##########
@@ -47,6 +47,9 @@ private InputFormatConfig() {
   public static final String CATALOG = "iceberg.mr.catalog";
   public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = 
"iceberg.mr.catalog.hadoop.warehouse.location";
   public static final String CATALOG_LOADER_CLASS = 
"iceberg.mr.catalog.loader.class";
+  public static final String WRITE_FILE_FORMAT = 
"iceberg.mr.write.file.format";

Review comment:
       Added the TODO message.
   Thanks for reminding me!

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -271,28 +271,25 @@ public void setupTask(TaskAttemptContext 
taskAttemptContext) {
     }
 
     @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
-      return true;
+    public boolean needsTaskCommit(TaskAttemptContext context) {
+      // We need to commit if this is the last phase of a MapReduce process
+      return 
TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
+          context.getJobConf().getNumReduceTasks() == 0;

Review comment:
       I have tested this with adding the jars to the Hive master from the 
apache repo and running the queries in local mode. I got some exception and 
started to investigate the source.
   
   The actual query I have used for this had an order by so it had both a 
Mapper and a Reducer phase:
   ```
   insert into purchases select * from purchases order by id;
   ```
   My debugging revealed that the commitTask was called for Mappers and 
Reducers as well. The JobRunner infrastructure calls those and it does not have 
any information about if the task has actually written anything to anywhere or 
not. I think this is the purpose of the _needsTaskCommit()_ method, so the 
developer of the OutputCommitter could decide.
   
   @massdosage: Do you have an easy example for HiveRunner write tests?
   
   Thanks,
   Peter

##########
File path: mr/src/main/java/org/apache/iceberg/mr/HiveSerDeConfig.java
##########
@@ -25,9 +25,9 @@
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
 
-public class InputFormatConfig {
+public final class HiveSerDeConfig {

Review comment:
       Added TODO message for now.
   Closing this conversation.

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,408 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergWritable;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat implements OutputFormat<NullWritable, 
IcebergWritable>,
+    HiveOutputFormat<NullWritable, IcebergWritable> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergOutputFormat.class);
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+  private static final String COMMITTED_EXTENSION = ".committed";
+
+  // <TaskAttemptId, ClosedFileData> map to store the data needed to create 
DataFiles
+  // Stored in concurrent map, since some executor engines can share containers
+  private static final Map<String, ClosedFileData> fileData = new 
ConcurrentHashMap<>();
+
+  private Configuration overlayedConf = null;
+  private String taskAttemptId = null;
+  private Schema schema = null;
+  private String location = null;
+  private FileFormat fileFormat = null;
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path 
finalOutPath,
+                                                           Class valueClass, 
boolean isCompressed,
+                                                           Properties 
tableAndSerDeProperties,
+                                                           Progressable 
progress)
+      throws IOException {
+    this.overlayedConf = createOverlayedConf(jc, tableAndSerDeProperties);
+    this.taskAttemptId = overlayedConf.get(TASK_ATTEMPT_ID_KEY);
+    this.schema = 
SchemaParser.fromJson(overlayedConf.get(InputFormatConfig.TABLE_SCHEMA));
+    this.location = generateTaskLocation(overlayedConf);
+    this.fileFormat = 
FileFormat.valueOf(overlayedConf.get(InputFormatConfig.WRITE_FILE_FORMAT).toUpperCase());
+    fileData.remove(this.taskAttemptId);
+    return new IcebergRecordWriter();
+  }
+
+  /**
+   * Returns the union of the configuration and table properties with the
+   * table properties taking precedence.
+   */
+  private static Configuration createOverlayedConf(Configuration conf, 
Properties tblProps) {
+    Configuration newConf = new Configuration(conf);
+    for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
+      newConf.set((String) prop.getKey(), (String) prop.getValue());
+    }
+    return newConf;
+  }
+
+  /**
+   * Generates query directory location based on the configuration.
+   * Currently it uses tableLocation/queryId
+   * @param conf The job's configuration
+   * @return The directory to store the query result files
+   */
+  private static String generateQueryLocation(Configuration conf) {
+    String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+    String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+    return tableLocation + "/" + queryId;
+  }
+
+  /**
+   * Generates file location based on the task configuration.
+   * Currently it uses QUERY_LOCATION/taskAttemptId.
+   * @param conf The job's configuration
+   * @return The file to store the results
+   */
+  private static String generateTaskLocation(Configuration conf) {
+    String taskAttemptId = conf.get(TASK_ATTEMPT_ID_KEY);
+    return generateQueryLocation(conf) + "/" + taskAttemptId;
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf 
job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException {
+    return new IcebergRecordWriter();
   }
 
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+    // Not doing any check.
+  }
+
+  protected class IcebergRecordWriter extends 
org.apache.hadoop.mapreduce.RecordWriter<NullWritable, IcebergWritable>
+          implements FileSinkOperator.RecordWriter,
+          org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
{
+
+    private final FileAppender<Record> appender;
+    private final FileIO io;
+
+    IcebergRecordWriter() throws IOException {
+      io = new HadoopFileIO(overlayedConf);
+      OutputFile dataFile = io.newOutputFile(location);
+
+      switch (fileFormat) {
+        case AVRO:
+          this.appender = Avro.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(DataWriter::create)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case PARQUET:
+          this.appender = Parquet.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericParquetWriter::buildWriter)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case ORC:
+          this.appender = ORC.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericOrcWriter::buildWriter)
+              .build();
+          break;
+
+        default:
+          throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
+      }
+      LOG.info("IcebergRecordWriter is created in {} with {}", location, 
fileFormat);
+    }
+
+    @Override
+    public void write(Writable row) {
+      Preconditions.checkArgument(row instanceof IcebergWritable);
+
+      // TODO partition handling
+      appender.add(((IcebergWritable) row).record());
+    }
+
+    @Override
+    public void write(NullWritable key, IcebergWritable value) {
+      write(value);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      appender.close();
+      if (!abort) {
+        fileData.put(taskAttemptId, new ClosedFileData(location, fileFormat, 
appender.length(), appender.metrics()));
+      }
+    }
+
+    @Override
+    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) 
throws IOException {
+      close(false);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      close(false);
+    }
+  }
+
+  /**
+   * A dummy committer - not related to the Hive transactions.
+   */
+  public static final class IcebergOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // do nothing.
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return true;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+      ClosedFileData closedFileData = 
fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+      createCommittedFileFor(new 
HadoopFileIO(taskAttemptContext.getJobConf()), closedFileData);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) {
+      fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      Configuration conf = jobContext.getJobConf();
+      Path queryResultPath = new Path(generateQueryLocation(conf));
+      Table table = Catalogs.loadTable(conf);
+
+      ExecutorService executor = null;
+      try {
+        // Creating executor service for parallel handling of file reads and 
deletes
+        executor = Executors.newFixedThreadPool(
+            conf.getInt(InputFormatConfig.WRITE_THREAD_POOL_SIZE, 
InputFormatConfig.WRITE_THREAD_POOL_SIZE_DEFAULT),
+            new ThreadFactoryBuilder()
+                .setDaemon(false)
+                .setPriority(Thread.NORM_PRIORITY)
+                .setNameFormat("iceberg-commit-pool-%d")
+                .build());
+
+        Set<String> taskTmpFiles = new HashSet<>();
+        Set<Future<DataFile>> dataFiles = new HashSet<>();
+        FileSystem fs = Util.getFs(queryResultPath, conf);
+
+        // Listing the task result directory and reading .committed files
+        RemoteIterator<LocatedFileStatus> taskFileStatuses = 
fs.listFiles(queryResultPath, false);
+        while (taskFileStatuses.hasNext()) {
+          LocatedFileStatus taskFile = taskFileStatuses.next();
+          String taskFileName = queryResultPath + "/" + 
taskFile.getPath().getName();
+          taskTmpFiles.add(taskFileName);
+          if (taskFileName.endsWith(COMMITTED_EXTENSION)) {
+            dataFiles.add(executor.submit(() -> {
+              LOG.debug("Reading committed file {}", taskFileName);
+              ClosedFileData cfd = readCommittedFile(table.io(), taskFileName);
+              DataFiles.Builder builder = 
DataFiles.builder(PartitionSpec.unpartitioned())
+                  .withPath(cfd.fileName)
+                  .withFormat(cfd.fileFormat)
+                  .withFileSizeInBytes(cfd.length)
+                  .withMetrics(cfd.serializableMetrics.metrics());
+              DataFile dataFile = builder.build();
+              return dataFile;
+            }));
+          }
+        }
+
+        // Appending data files to the table
+        AppendFiles append = table.newAppend();

Review comment:
       The current patch is only aiming to insert new data to the table (no 
delete/update etc at the moment).
   @rdblue: Is it enough to add data with newAppend, or we have to think about 
overwrite/replacePartitions too?
   Thanks,
   Peter

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,445 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.exceptions.NotFoundException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergWritable;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import 
org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.iceberg.util.Tasks;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat implements OutputFormat<NullWritable, 
IcebergWritable>,
+    HiveOutputFormat<NullWritable, IcebergWritable> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergOutputFormat.class);
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+  private static final String COMMITTED_EXTENSION = ".committed";
+
+  // <TaskAttemptId, ClosedFileData> map to store the data needed to create 
DataFiles
+  // Stored in concurrent map, since some executor engines can share containers
+  private static final Map<TaskAttemptID, ClosedFileData> fileData = new 
ConcurrentHashMap<>();
+
+  private Configuration overlayedConf = null;
+  private TaskAttemptID taskAttemptId = null;
+  private Schema schema = null;
+  private FileFormat fileFormat = null;
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path 
finalOutPath, Class valueClass,
+      boolean isCompressed, Properties tableAndSerDeProperties, Progressable 
progress) throws IOException {
+
+    this.overlayedConf = createOverlayedConf(jc, tableAndSerDeProperties);
+    this.taskAttemptId = 
TaskAttemptID.forName(overlayedConf.get(TASK_ATTEMPT_ID_KEY));
+    this.schema = 
SchemaParser.fromJson(overlayedConf.get(InputFormatConfig.TABLE_SCHEMA));
+    this.fileFormat = 
FileFormat.valueOf(overlayedConf.get(InputFormatConfig.WRITE_FILE_FORMAT).toUpperCase());
+
+    fileData.remove(this.taskAttemptId);
+
+    return new IcebergRecordWriter(generateDataFileLocation(overlayedConf, 
taskAttemptId));
+  }
+
+  /**
+   * Returns the union of the configuration and table properties with the
+   * table properties taking precedence.
+   */
+  private static Configuration createOverlayedConf(Configuration conf, 
Properties tblProps) {
+    Configuration newConf = new Configuration(conf);
+    for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
+      newConf.set((String) prop.getKey(), (String) prop.getValue());
+    }
+    return newConf;
+  }
+
+  /**
+   * Generates query directory location based on the configuration.
+   * Currently it uses tableLocation/queryId
+   * @param conf The job's configuration
+   * @return The directory to store the query result files
+   */
+  private static String generateQueryLocation(Configuration conf) {
+    String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+    String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+    return tableLocation + "/" + queryId;
+  }
+
+  /**
+   * Generates datafile location based on the task configuration.
+   * Currently it uses QUERY_LOCATION/jobId/taskAttemptId.
+   * @param conf The job's configuration
+   * @param taskAttemptId The TaskAttemptID for the task
+   * @return The file to store the results
+   */
+  private static String generateDataFileLocation(Configuration conf, 
TaskAttemptID taskAttemptId) {
+    return generateQueryLocation(conf) + "/" + taskAttemptId.getJobID() + "/" 
+ taskAttemptId.toString();
+  }
+
+  /**
+   * Generates commit file location based on the task configuration and a 
specific task id.
+   * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks].committed.
+   * @param conf The job's configuration
+   * @param jobId The jobId for the task
+   * @param taskId The taskId for the commit file
+   * @return The file to store the results
+   */
+  private static String generateCommitFileLocation(Configuration conf, JobID 
jobId, int taskId) {
+    return generateQueryLocation(conf) + "/" + jobId + "/task-" + taskId + 
COMMITTED_EXTENSION;
+  }
+
+  /**
+   * Generates commit file location based on the task configuration.
+   * Currently it uses QUERY_LOCATION/jobId/task-[0..numTasks].committed.
+   * @param conf The job's configuration
+   * @param taskAttemptId The TaskAttemptID for the task
+   * @return The file to store the results
+   */
+  private static String generateCommitFileLocation(Configuration conf, 
TaskAttemptID taskAttemptId) {
+    return generateCommitFileLocation(conf, taskAttemptId.getJobID(), 
taskAttemptId.getTaskID().getId());
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf 
job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
getRecordWriter(FileSystem ignored,
+      JobConf job, String name, Progressable progress) {
+
+    throw new UnsupportedOperationException("Please implement if needed");
   }
 
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+    // Not doing any check.
+  }
+
+  protected class IcebergRecordWriter extends 
org.apache.hadoop.mapreduce.RecordWriter<NullWritable, IcebergWritable>
+      implements FileSinkOperator.RecordWriter, 
org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> {
+
+    private final String location;
+    private final FileAppender<Record> appender;
+    private final FileIO io;
+
+    IcebergRecordWriter(String location) throws IOException {
+      this.location = location;
+      io = new HadoopFileIO(overlayedConf);
+      OutputFile dataFile = io.newOutputFile(location);
+
+      switch (fileFormat) {
+        case AVRO:
+          this.appender = Avro.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(DataWriter::create)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case PARQUET:
+          this.appender = Parquet.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericParquetWriter::buildWriter)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case ORC:
+          this.appender = ORC.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericOrcWriter::buildWriter)
+              .build();
+          break;
+
+        default:
+          throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
+      }
+      LOG.info("IcebergRecordWriter is created in {} with {}", location, 
fileFormat);
+    }
+
+    @Override
+    public void write(Writable row) {
+      Preconditions.checkArgument(row instanceof IcebergWritable);
+
+      // TODO partition handling
+      appender.add(((IcebergWritable) row).record());
+    }
+
+    @Override
+    public void write(NullWritable key, IcebergWritable value) {
+      write(value);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      appender.close();
+      if (!abort) {
+        fileData.put(taskAttemptId, new ClosedFileData(location, fileFormat, 
appender.length(), appender.metrics()));
+      }
+    }
+
+    @Override
+    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) 
throws IOException {
+      close(false);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      close(false);
+    }
+  }
+
+  /**
+   * A dummy committer - not related to the Hive transactions.
+   */
+  public static final class IcebergOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // do nothing.
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return true;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+      ClosedFileData closedFileData = 
fileData.remove(context.getTaskAttemptID());
+      String commitFileLocation = 
generateCommitFileLocation(context.getJobConf(), context.getTaskAttemptID());
+
+      // If we created a new data file, then create the committed file for this
+      if (closedFileData != null) {
+        createCommittedFileFor(new HadoopFileIO(context.getJobConf()), 
closedFileData, commitFileLocation);
+      } else {
+        TaskType taskType = 
context.getTaskAttemptID().getTaskID().getTaskType();
+        boolean isWrite = 
context.getJobConf().getBoolean(HiveIcebergStorageHandler.WRITE_KEY, false);
+        boolean mapOnly = context.getJobConf().getNumReduceTasks() == 0;
+
+        // If we writing and the task is either reducer or a map in a map-only 
job then write an empty commit file
+        if (isWrite && (TaskType.REDUCE.equals(taskType) || 
(TaskType.MAP.equals(taskType) && mapOnly))) {
+          createCommittedFileFor(new HadoopFileIO(context.getJobConf()), new 
ClosedFileData(), commitFileLocation);
+        }
+      }
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) {
+      FileIO io = new HadoopFileIO(context.getJobConf());
+
+      // Clean up local cache for metadata
+      fileData.remove(context.getTaskAttemptID());
+
+      // Remove the result file for the failed task

Review comment:
       Added the abortTask handling

##########
File path: mr/src/main/java/org/apache/iceberg/mr/InputFormatConfig.java
##########
@@ -47,6 +47,9 @@ private InputFormatConfig() {
   public static final String CATALOG = "iceberg.mr.catalog";
   public static final String HADOOP_CATALOG_WAREHOUSE_LOCATION = 
"iceberg.mr.catalog.hadoop.warehouse.location";
   public static final String CATALOG_LOADER_CLASS = 
"iceberg.mr.catalog.loader.class";
+  public static final String WRITE_FILE_FORMAT = 
"iceberg.mr.write.file.format";

Review comment:
       Added the TODO message.
   Thanks for reminding me!

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -271,28 +271,25 @@ public void setupTask(TaskAttemptContext 
taskAttemptContext) {
     }
 
     @Override
-    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
-      return true;
+    public boolean needsTaskCommit(TaskAttemptContext context) {
+      // We need to commit if this is the last phase of a MapReduce process
+      return 
TaskType.REDUCE.equals(context.getTaskAttemptID().getTaskID().getTaskType()) ||
+          context.getJobConf().getNumReduceTasks() == 0;

Review comment:
       I have tested this with adding the jars to the Hive master from the 
apache repo and running the queries in local mode. I got some exception and 
started to investigate the source.
   
   The actual query I have used for this had an order by so it had both a 
Mapper and a Reducer phase:
   ```
   insert into purchases select * from purchases order by id;
   ```
   My debugging revealed that the commitTask was called for Mappers and 
Reducers as well. The JobRunner infrastructure calls those and it does not have 
any information about if the task has actually written anything to anywhere or 
not. I think this is the purpose of the _needsTaskCommit()_ method, so the 
developer of the OutputCommitter could decide.
   
   @massdosage: Do you have an easy example for HiveRunner write tests?
   
   Thanks,
   Peter




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to