pvary commented on a change in pull request #1407:
URL: https://github.com/apache/iceberg/pull/1407#discussion_r482098723
##########
File path:
mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
##########
@@ -19,21 +19,216 @@
package org.apache.iceberg.mr.hive;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.OutputFormat;
-import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.Progressable;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.avro.DataWriter;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.mr.Catalogs;
+import org.apache.iceberg.mr.HiveSerDeConfig;
+import org.apache.iceberg.mr.mapreduce.IcebergWritable;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-public class HiveIcebergOutputFormat<T> implements OutputFormat<Void, T> {
+public class HiveIcebergOutputFormat extends OutputFormat<NullWritable,
IcebergWritable>
+ implements HiveOutputFormat<NullWritable, IcebergWritable> {
+ private Configuration overlayedConf = null;
+ private Table table = null;
@Override
- public RecordWriter<Void, T> getRecordWriter(FileSystem ignored, JobConf
job, String name, Progressable progress) {
- throw new UnsupportedOperationException("Writing to an Iceberg table with
Hive is not supported");
+ @SuppressWarnings("rawtypes")
+ public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path
finalOutPath,
+ Class valueClass,
boolean isCompressed,
+ Properties
tableAndSerDeProperties,
+ Progressable
progress)
+ throws IOException {
+ this.overlayedConf = createOverlayedConf(jc, tableAndSerDeProperties);
+ this.table = Catalogs.loadTable(this.overlayedConf,
tableAndSerDeProperties);
+ return new IcebergRecordWriter();
+ }
+
+ /**
+ * Returns the union of the configuration and table properties with the
+ * table properties taking precedence.
+ */
+ private static Configuration createOverlayedConf(Configuration conf,
Properties tblProps) {
+ Configuration newConf = new Configuration(conf);
+ for (Map.Entry<Object, Object> prop : tblProps.entrySet()) {
+ newConf.set((String) prop.getKey(), (String) prop.getValue());
+ }
+ return newConf;
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable>
getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress)
+ throws IOException {
+ return new IcebergRecordWriter();
+ }
+
+ @Override
+ public org.apache.hadoop.mapreduce.RecordWriter<NullWritable,
IcebergWritable> getRecordWriter(
+ TaskAttemptContext context)
+ throws IOException {
+ return new IcebergRecordWriter();
}
@Override
public void checkOutputSpecs(FileSystem ignored, JobConf job) {
- throw new UnsupportedOperationException("Writing to an Iceberg table with
Hive is not supported");
+ // Not doing any check.
+ }
+
+ @Override
+ public void checkOutputSpecs(JobContext context) {
+ // Not doing any check.
+ }
+
+ @Override
+ public OutputCommitter getOutputCommitter(TaskAttemptContext context) {
+ return new IcebergOutputCommitter();
+ }
+
+ protected class IcebergRecordWriter extends
org.apache.hadoop.mapreduce.RecordWriter<NullWritable, IcebergWritable>
+ implements FileSinkOperator.RecordWriter,
+ org.apache.hadoop.mapred.RecordWriter<NullWritable, IcebergWritable>
{
+
+ private final FileAppender<Record> appender;
+ private final String location;
+ private final FileFormat fileFormat;
+
+ IcebergRecordWriter() throws IOException {
+ this.fileFormat =
FileFormat.valueOf(overlayedConf.get(HiveSerDeConfig.WRITE_FILE_FORMAT).toUpperCase());
+
+ String queryId =
overlayedConf.get(HiveConf.ConfVars.HIVEQUERYID.varname);
+ this.location = table.location() + "/" + queryId + UUID.randomUUID();
+
+ FileIO io = new HadoopFileIO(overlayedConf);
+ OutputFile dataFile = io.newOutputFile(location);
+
+ switch (fileFormat) {
+ case AVRO:
+ this.appender = Avro.write(dataFile)
+ .schema(table.schema())
+ .createWriterFunc(DataWriter::create)
+ .named(fileFormat.name())
+ .build();
+ break;
+
+ case PARQUET:
+ this.appender = Parquet.write(dataFile)
+ .schema(table.schema())
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .named(fileFormat.name())
+ .build();
+ break;
+
+ case ORC:
+ this.appender = ORC.write(dataFile)
+ .schema(table.schema())
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .build();
+ break;
+
+ default:
+ throw new UnsupportedOperationException("Cannot write format: " +
fileFormat);
+ }
+ }
+
+ @Override
+ public void write(Writable row) {
+ Preconditions.checkArgument(row instanceof IcebergWritable);
+
+ // TODO partition handling
+ appender.add(((IcebergWritable) row).record());
+ }
+
+ @Override
+ public void write(NullWritable key, IcebergWritable value) {
+ write(value);
+ }
+
+ @Override
+ public void close(boolean abort) throws IOException {
+ appender.close();
+
+ DataFiles.Builder builder = DataFiles.builder(table.spec())
+ .withPath(location)
+ .withFormat(fileFormat)
+ .withFileSizeInBytes(appender.length())
+ .withMetrics(appender.metrics());
+
+ AppendFiles append = table.newAppend();
+ append = append.appendFile(builder.build());
+ append.commit();
+ }
+
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ close(false);
+ }
+
+ @Override
+ public void close(Reporter reporter) throws IOException {
+ close(false);
+ }
+ }
+
+ /**
+ * A dummy committer - not related to the Hive transactions.
+ */
+ private static final class IcebergOutputCommitter extends OutputCommitter {
Review comment:
Ok. Found the issue.
We need to set:
```
@Override
public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
jobConf.set("mapred.output.committer.class",
HiveIcebergOutputFormat.IcebergOutputCommitter.class.getName());
}
```
And implement org.apache.hadoop.mapred.OutputCommitter instead of the
mapreduce version.
This might work. :)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]