steveloughran commented on a change in pull request #1407:
URL: https://github.com/apache/iceberg/pull/1407#discussion_r483706822



##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,342 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergWritable;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat implements OutputFormat<NullWritable, 
IcebergWritable>,
+    HiveOutputFormat<NullWritable, IcebergWritable> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergOutputFormat.class);
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+  private static final String COMMITTED_PREFIX = ".committed";
+
+  // <TaskAttemptId, ClosedFileData> map to store the data needed to create 
DataFiles
+  // Stored in concurrent map, since some executor engines can share containers
+  private static final Map<String, ClosedFileData> fileData = new 
ConcurrentHashMap<>();
+
+  private Configuration overlayedConf = null;
+  private String taskAttemptId = null;
+  private Schema schema = null;
+  private String location = null;
+  private FileFormat fileFormat = null;
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path 
finalOutPath,
+                                                           Class valueClass, 
boolean isCompressed,
+                                                           Properties 
tableAndSerDeProperties,
+                                                           Progressable 
progress)
+      throws IOException {
+    this.overlayedConf = createOverlayedConf(jc, tableAndSerDeProperties);
+    this.taskAttemptId = overlayedConf.get(TASK_ATTEMPT_ID_KEY);
+    this.schema = 
SchemaParser.fromJson(overlayedConf.get(InputFormatConfig.TABLE_SCHEMA));
+    this.location = generateLocation(overlayedConf);
+    this.fileFormat = 
FileFormat.valueOf(overlayedConf.get(InputFormatConfig.WRITE_FILE_FORMAT).toUpperCase());
+    fileData.remove(this.taskAttemptId);
+    return new IcebergRecordWriter();
+  }
+
+  /**
+   * Returns the union of the configuration and table properties with the
+   * table properties taking precedence.
+   */
+  private static Configuration createOverlayedConf(Configuration conf, 
Properties tblProps) {
+    Configuration newConf = new Configuration(conf);
+    for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
+      newConf.set((String) prop.getKey(), (String) prop.getValue());
+    }
+    return newConf;
+  }
+
+  /**
+   * Generates file location based on the task configuration.
+   * Currently it uses tableLocation/queryId/taskAttemptId
+   * @param conf The job's configuration
+   * @return The directory to store the result files
+   */
+  private static String generateLocation(Configuration conf) {
+    String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+    String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+    String taskAttemptId = conf.get(TASK_ATTEMPT_ID_KEY);
+    return tableLocation + "/" + queryId + "/" + taskAttemptId;
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf 
job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException {
+    return new IcebergRecordWriter();
   }
 
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+    // Not doing any check.
+  }
+
+  protected class IcebergRecordWriter extends 
org.apache.hadoop.mapreduce.RecordWriter<NullWritable, IcebergWritable>
+          implements FileSinkOperator.RecordWriter,
+          org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
{
+
+    private final FileAppender<Record> appender;
+    private final FileIO io;
+
+    IcebergRecordWriter() throws IOException {
+      io = new HadoopFileIO(overlayedConf);
+      OutputFile dataFile = io.newOutputFile(location);
+
+      switch (fileFormat) {
+        case AVRO:
+          this.appender = Avro.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(DataWriter::create)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case PARQUET:
+          this.appender = Parquet.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericParquetWriter::buildWriter)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case ORC:
+          this.appender = ORC.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericOrcWriter::buildWriter)
+              .build();
+          break;
+
+        default:
+          throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
+      }
+      LOG.info("IcebergRecordWriter is created in {} with {}", location, 
fileFormat);
+    }
+
+    @Override
+    public void write(Writable row) {
+      Preconditions.checkArgument(row instanceof IcebergWritable);
+
+      // TODO partition handling
+      appender.add(((IcebergWritable) row).record());
+    }
+
+    @Override
+    public void write(NullWritable key, IcebergWritable value) {
+      write(value);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      appender.close();
+      if (!abort) {
+        fileData.put(taskAttemptId, new ClosedFileData(location, fileFormat, 
appender.length(), appender.metrics()));
+      }
+    }
+
+    @Override
+    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) 
throws IOException {
+      close(false);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      close(false);
+    }
+  }
+
+  /**
+   * A dummy committer - not related to the Hive transactions.
+   */
+  public static final class IcebergOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // do nothing.
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return true;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+      ClosedFileData closedFileData = 
fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+      createCommittedFileFor(new 
HadoopFileIO(taskAttemptContext.getJobConf()), closedFileData);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) {
+      fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      Configuration conf = jobContext.getJobConf();
+      Table table = Catalogs.loadTable(conf);
+      AppendFiles append = table.newAppend();
+      String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+      String location = conf.get(InputFormatConfig.TABLE_LOCATION);
+      Path resultPath = new Path(location, queryId);
+      FileSystem fs = Util.getFs(resultPath, conf);
+      RemoteIterator<LocatedFileStatus> fileStatuses = 
fs.listFiles(resultPath, false);
+      Set<String> filesToKeep = new HashSet<>();
+      Set<String> files = new HashSet<>();
+      while (fileStatuses.hasNext()) {
+        LocatedFileStatus status = fileStatuses.next();
+        String fileName = status.getPath().getName();
+        files.add(resultPath.toString() + "/" + fileName);
+        if (fileName.endsWith(COMMITTED_PREFIX)) {
+          LOG.debug("Reading committed file {}", fileName);
+          ClosedFileData cfd = readCommittedFile(table.io(), 
resultPath.toString() + "/" + fileName);
+          DataFiles.Builder builder = 
DataFiles.builder(PartitionSpec.unpartitioned())
+              .withPath(cfd.fileName)
+              .withFormat(cfd.fileFormat)
+              .withFileSizeInBytes(cfd.length)
+              .withMetrics(cfd.serializableMetrics.metrics());
+          append = append.appendFile(builder.build());
+          filesToKeep.add(cfd.fileName);
+        }
+      }
+      append.commit();
+      LOG.info("Iceberg write is committed for {} with files {}", table, 
filesToKeep);
+      files.removeAll(filesToKeep);
+      LOG.debug("Removing unused files: {}", files);
+      files.forEach(file -> table.io().deleteFile(file));

Review comment:
       this is potentially slow: could you do it async

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,342 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergWritable;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat implements OutputFormat<NullWritable, 
IcebergWritable>,
+    HiveOutputFormat<NullWritable, IcebergWritable> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergOutputFormat.class);
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+  private static final String COMMITTED_PREFIX = ".committed";
+
+  // <TaskAttemptId, ClosedFileData> map to store the data needed to create 
DataFiles
+  // Stored in concurrent map, since some executor engines can share containers
+  private static final Map<String, ClosedFileData> fileData = new 
ConcurrentHashMap<>();
+
+  private Configuration overlayedConf = null;
+  private String taskAttemptId = null;
+  private Schema schema = null;
+  private String location = null;
+  private FileFormat fileFormat = null;
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path 
finalOutPath,
+                                                           Class valueClass, 
boolean isCompressed,
+                                                           Properties 
tableAndSerDeProperties,
+                                                           Progressable 
progress)
+      throws IOException {
+    this.overlayedConf = createOverlayedConf(jc, tableAndSerDeProperties);
+    this.taskAttemptId = overlayedConf.get(TASK_ATTEMPT_ID_KEY);
+    this.schema = 
SchemaParser.fromJson(overlayedConf.get(InputFormatConfig.TABLE_SCHEMA));
+    this.location = generateLocation(overlayedConf);
+    this.fileFormat = 
FileFormat.valueOf(overlayedConf.get(InputFormatConfig.WRITE_FILE_FORMAT).toUpperCase());
+    fileData.remove(this.taskAttemptId);
+    return new IcebergRecordWriter();
+  }
+
+  /**
+   * Returns the union of the configuration and table properties with the
+   * table properties taking precedence.
+   */
+  private static Configuration createOverlayedConf(Configuration conf, 
Properties tblProps) {
+    Configuration newConf = new Configuration(conf);
+    for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
+      newConf.set((String) prop.getKey(), (String) prop.getValue());
+    }
+    return newConf;
+  }
+
+  /**
+   * Generates file location based on the task configuration.
+   * Currently it uses tableLocation/queryId/taskAttemptId
+   * @param conf The job's configuration
+   * @return The directory to store the result files
+   */
+  private static String generateLocation(Configuration conf) {
+    String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+    String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+    String taskAttemptId = conf.get(TASK_ATTEMPT_ID_KEY);
+    return tableLocation + "/" + queryId + "/" + taskAttemptId;
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf 
job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException {
+    return new IcebergRecordWriter();
   }
 
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+    // Not doing any check.
+  }
+
+  protected class IcebergRecordWriter extends 
org.apache.hadoop.mapreduce.RecordWriter<NullWritable, IcebergWritable>
+          implements FileSinkOperator.RecordWriter,
+          org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
{
+
+    private final FileAppender<Record> appender;
+    private final FileIO io;
+
+    IcebergRecordWriter() throws IOException {
+      io = new HadoopFileIO(overlayedConf);
+      OutputFile dataFile = io.newOutputFile(location);
+
+      switch (fileFormat) {
+        case AVRO:
+          this.appender = Avro.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(DataWriter::create)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case PARQUET:
+          this.appender = Parquet.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericParquetWriter::buildWriter)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case ORC:
+          this.appender = ORC.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericOrcWriter::buildWriter)
+              .build();
+          break;
+
+        default:
+          throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
+      }
+      LOG.info("IcebergRecordWriter is created in {} with {}", location, 
fileFormat);
+    }
+
+    @Override
+    public void write(Writable row) {
+      Preconditions.checkArgument(row instanceof IcebergWritable);
+
+      // TODO partition handling
+      appender.add(((IcebergWritable) row).record());
+    }
+
+    @Override
+    public void write(NullWritable key, IcebergWritable value) {
+      write(value);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      appender.close();
+      if (!abort) {
+        fileData.put(taskAttemptId, new ClosedFileData(location, fileFormat, 
appender.length(), appender.metrics()));
+      }
+    }
+
+    @Override
+    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) 
throws IOException {
+      close(false);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      close(false);
+    }
+  }
+
+  /**
+   * A dummy committer - not related to the Hive transactions.
+   */
+  public static final class IcebergOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // do nothing.
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return true;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+      ClosedFileData closedFileData = 
fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+      createCommittedFileFor(new 
HadoopFileIO(taskAttemptContext.getJobConf()), closedFileData);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) {
+      fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      Configuration conf = jobContext.getJobConf();
+      Table table = Catalogs.loadTable(conf);
+      AppendFiles append = table.newAppend();
+      String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+      String location = conf.get(InputFormatConfig.TABLE_LOCATION);
+      Path resultPath = new Path(location, queryId);
+      FileSystem fs = Util.getFs(resultPath, conf);
+      RemoteIterator<LocatedFileStatus> fileStatuses = 
fs.listFiles(resultPath, false);
+      Set<String> filesToKeep = new HashSet<>();
+      Set<String> files = new HashSet<>();
+      while (fileStatuses.hasNext()) {
+        LocatedFileStatus status = fileStatuses.next();
+        String fileName = status.getPath().getName();
+        files.add(resultPath.toString() + "/" + fileName);
+        if (fileName.endsWith(COMMITTED_PREFIX)) {
+          LOG.debug("Reading committed file {}", fileName);
+          ClosedFileData cfd = readCommittedFile(table.io(), 
resultPath.toString() + "/" + fileName);
+          DataFiles.Builder builder = 
DataFiles.builder(PartitionSpec.unpartitioned())
+              .withPath(cfd.fileName)
+              .withFormat(cfd.fileFormat)
+              .withFileSizeInBytes(cfd.length)
+              .withMetrics(cfd.serializableMetrics.metrics());
+          append = append.appendFile(builder.build());
+          filesToKeep.add(cfd.fileName);
+        }
+      }
+      append.commit();
+      LOG.info("Iceberg write is committed for {} with files {}", table, 
filesToKeep);
+      files.removeAll(filesToKeep);
+      LOG.debug("Removing unused files: {}", files);
+      files.forEach(file -> table.io().deleteFile(file));
+      cleanupJob(jobContext);
+    }
+  }
+
+  private static void createCommittedFileFor(FileIO io, ClosedFileData 
closedFileData) throws IOException {
+    OutputFile commitFile = io.newOutputFile(closedFileData.fileName + 
COMMITTED_PREFIX);
+    ObjectOutputStream oos = new 
ObjectOutputStream(commitFile.createOrOverwrite());
+    oos.writeObject(closedFileData);
+    oos.close();
+    LOG.debug("Iceberg committed file is created {}", commitFile);
+  }
+
+  private static ClosedFileData readCommittedFile(FileIO io, String 
committedFileLocation) throws IOException {
+    try (ObjectInputStream ois = new 
ObjectInputStream(io.newInputFile(committedFileLocation).newStream())) {
+      return (ClosedFileData) ois.readObject();
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Can not parse committed file: " + 
committedFileLocation, cnfe);
+    }
+  }
+
+  private static final class ClosedFileData implements Serializable {
+    private String fileName;
+    private FileFormat fileFormat;
+    private Long length;
+    private SerializableMetrics serializableMetrics;
+
+    private ClosedFileData(String fileName, FileFormat fileFormat, Long 
length, Metrics metrics) {
+      this.fileName = fileName;
+      this.fileFormat = fileFormat;
+      this.length = length;
+      this.serializableMetrics = new SerializableMetrics(metrics);
+    }
+  }
+
+  /**
+   * We need this class, since Metrics in not Serializable (even though it 
implements the interface)
+   */
+  private static final class SerializableMetrics implements Serializable {
+    private Long rowCount;
+    private Map<Integer, Long> columnSizes;

Review comment:
       Don't think HashMap is serializable. TreeMap is, FWIW.

##########
File path: 
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,342 @@
 
 package org.apache.iceberg.mr.hive;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCommitter;
 import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Metrics;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SchemaParser;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.hadoop.Util;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.InputFormatConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergWritable;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat implements OutputFormat<NullWritable, 
IcebergWritable>,
+    HiveOutputFormat<NullWritable, IcebergWritable> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HiveIcebergOutputFormat.class);
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+  private static final String COMMITTED_PREFIX = ".committed";
+
+  // <TaskAttemptId, ClosedFileData> map to store the data needed to create 
DataFiles
+  // Stored in concurrent map, since some executor engines can share containers
+  private static final Map<String, ClosedFileData> fileData = new 
ConcurrentHashMap<>();
+
+  private Configuration overlayedConf = null;
+  private String taskAttemptId = null;
+  private Schema schema = null;
+  private String location = null;
+  private FileFormat fileFormat = null;
+
+  @Override
+  @SuppressWarnings("rawtypes")
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path 
finalOutPath,
+                                                           Class valueClass, 
boolean isCompressed,
+                                                           Properties 
tableAndSerDeProperties,
+                                                           Progressable 
progress)
+      throws IOException {
+    this.overlayedConf = createOverlayedConf(jc, tableAndSerDeProperties);
+    this.taskAttemptId = overlayedConf.get(TASK_ATTEMPT_ID_KEY);
+    this.schema = 
SchemaParser.fromJson(overlayedConf.get(InputFormatConfig.TABLE_SCHEMA));
+    this.location = generateLocation(overlayedConf);
+    this.fileFormat = 
FileFormat.valueOf(overlayedConf.get(InputFormatConfig.WRITE_FILE_FORMAT).toUpperCase());
+    fileData.remove(this.taskAttemptId);
+    return new IcebergRecordWriter();
+  }
+
+  /**
+   * Returns the union of the configuration and table properties with the
+   * table properties taking precedence.
+   */
+  private static Configuration createOverlayedConf(Configuration conf, 
Properties tblProps) {
+    Configuration newConf = new Configuration(conf);
+    for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
+      newConf.set((String) prop.getKey(), (String) prop.getValue());
+    }
+    return newConf;
+  }
+
+  /**
+   * Generates file location based on the task configuration.
+   * Currently it uses tableLocation/queryId/taskAttemptId
+   * @param conf The job's configuration
+   * @return The directory to store the result files
+   */
+  private static String generateLocation(Configuration conf) {
+    String tableLocation = conf.get(InputFormatConfig.TABLE_LOCATION);
+    String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+    String taskAttemptId = conf.get(TASK_ATTEMPT_ID_KEY);
+    return tableLocation + "/" + queryId + "/" + taskAttemptId;
+  }
 
   @Override
-  public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf 
job, String name, Progressable progress) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+  public org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException {
+    return new IcebergRecordWriter();
   }
 
   @Override
   public void checkOutputSpecs(FileSystem ignored, JobConf job) {
-    throw new UnsupportedOperationException("Writing to an Iceberg table with 
Hive is not supported");
+    // Not doing any check.
+  }
+
+  protected class IcebergRecordWriter extends 
org.apache.hadoop.mapreduce.RecordWriter<NullWritable, IcebergWritable>
+          implements FileSinkOperator.RecordWriter,
+          org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable> 
{
+
+    private final FileAppender<Record> appender;
+    private final FileIO io;
+
+    IcebergRecordWriter() throws IOException {
+      io = new HadoopFileIO(overlayedConf);
+      OutputFile dataFile = io.newOutputFile(location);
+
+      switch (fileFormat) {
+        case AVRO:
+          this.appender = Avro.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(DataWriter::create)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case PARQUET:
+          this.appender = Parquet.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericParquetWriter::buildWriter)
+              .named(fileFormat.name())
+              .build();
+          break;
+
+        case ORC:
+          this.appender = ORC.write(dataFile)
+              .schema(schema)
+              .createWriterFunc(GenericOrcWriter::buildWriter)
+              .build();
+          break;
+
+        default:
+          throw new UnsupportedOperationException("Cannot write format: " + 
fileFormat);
+      }
+      LOG.info("IcebergRecordWriter is created in {} with {}", location, 
fileFormat);
+    }
+
+    @Override
+    public void write(Writable row) {
+      Preconditions.checkArgument(row instanceof IcebergWritable);
+
+      // TODO partition handling
+      appender.add(((IcebergWritable) row).record());
+    }
+
+    @Override
+    public void write(NullWritable key, IcebergWritable value) {
+      write(value);
+    }
+
+    @Override
+    public void close(boolean abort) throws IOException {
+      appender.close();
+      if (!abort) {
+        fileData.put(taskAttemptId, new ClosedFileData(location, fileFormat, 
appender.length(), appender.metrics()));
+      }
+    }
+
+    @Override
+    public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context) 
throws IOException {
+      close(false);
+    }
+
+    @Override
+    public void close(Reporter reporter) throws IOException {
+      close(false);
+    }
+  }
+
+  /**
+   * A dummy committer - not related to the Hive transactions.
+   */
+  public static final class IcebergOutputCommitter extends OutputCommitter {
+
+    @Override
+    public void setupJob(JobContext jobContext) {
+      // do nothing.
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskAttemptContext) {
+      // do nothing.
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
+      return true;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskAttemptContext) throws 
IOException {
+      ClosedFileData closedFileData = 
fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+      createCommittedFileFor(new 
HadoopFileIO(taskAttemptContext.getJobConf()), closedFileData);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskAttemptContext) {
+      fileData.remove(taskAttemptContext.getTaskAttemptID().toString());
+    }
+
+    @Override
+    public void commitJob(JobContext jobContext) throws IOException {
+      Configuration conf = jobContext.getJobConf();
+      Table table = Catalogs.loadTable(conf);
+      AppendFiles append = table.newAppend();
+      String queryId = conf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+      String location = conf.get(InputFormatConfig.TABLE_LOCATION);
+      Path resultPath = new Path(location, queryId);
+      FileSystem fs = Util.getFs(resultPath, conf);
+      RemoteIterator<LocatedFileStatus> fileStatuses = 
fs.listFiles(resultPath, false);
+      Set<String> filesToKeep = new HashSet<>();
+      Set<String> files = new HashSet<>();
+      while (fileStatuses.hasNext()) {
+        LocatedFileStatus status = fileStatuses.next();
+        String fileName = status.getPath().getName();
+        files.add(resultPath.toString() + "/" + fileName);
+        if (fileName.endsWith(COMMITTED_PREFIX)) {
+          LOG.debug("Reading committed file {}", fileName);
+          ClosedFileData cfd = readCommittedFile(table.io(), 
resultPath.toString() + "/" + fileName);
+          DataFiles.Builder builder = 
DataFiles.builder(PartitionSpec.unpartitioned())
+              .withPath(cfd.fileName)
+              .withFormat(cfd.fileFormat)
+              .withFileSizeInBytes(cfd.length)
+              .withMetrics(cfd.serializableMetrics.metrics());
+          append = append.appendFile(builder.build());
+          filesToKeep.add(cfd.fileName);
+        }
+      }
+      append.commit();
+      LOG.info("Iceberg write is committed for {} with files {}", table, 
filesToKeep);
+      files.removeAll(filesToKeep);
+      LOG.debug("Removing unused files: {}", files);
+      files.forEach(file -> table.io().deleteFile(file));
+      cleanupJob(jobContext);
+    }
+  }
+
+  private static void createCommittedFileFor(FileIO io, ClosedFileData 
closedFileData) throws IOException {
+    OutputFile commitFile = io.newOutputFile(closedFileData.fileName + 
COMMITTED_PREFIX);
+    ObjectOutputStream oos = new 
ObjectOutputStream(commitFile.createOrOverwrite());
+    oos.writeObject(closedFileData);
+    oos.close();
+    LOG.debug("Iceberg committed file is created {}", commitFile);
+  }
+
+  private static ClosedFileData readCommittedFile(FileIO io, String 
committedFileLocation) throws IOException {
+    try (ObjectInputStream ois = new 
ObjectInputStream(io.newInputFile(committedFileLocation).newStream())) {
+      return (ClosedFileData) ois.readObject();
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException("Can not parse committed file: " + 
committedFileLocation, cnfe);
+    }
+  }
+
+  private static final class ClosedFileData implements Serializable {
+    private String fileName;
+    private FileFormat fileFormat;
+    private Long length;
+    private SerializableMetrics serializableMetrics;
+
+    private ClosedFileData(String fileName, FileFormat fileFormat, Long 
length, Metrics metrics) {
+      this.fileName = fileName;
+      this.fileFormat = fileFormat;
+      this.length = length;
+      this.serializableMetrics = new SerializableMetrics(metrics);
+    }
+  }
+
+  /**
+   * We need this class, since Metrics in not Serializable (even though it 
implements the interface)
+   */
+  private static final class SerializableMetrics implements Serializable {

Review comment:
       I'm adding a metrics API For FS classes (readers, writers etc) to 
optionally implement: https://github.com/apache/hadoop/pull/2069
   
   
https://github.com/steveloughran/hadoop/blob/s3/HADOOP-16830-iostatistics/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/iostatistics.md
   
   While you can't use that (yet), it'd be good to design your stats so that 
they can be included




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to