This is an automated email from the ASF dual-hosted git repository.

blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ab15bf  Hive: Update Hive write path for Tez (#2163)
3ab15bf is described below

commit 3ab15bf15c22f5d11dee6b2126b2e72bd99a4b3d
Author: Marton Bod <[email protected]>
AuthorDate: Thu Feb 4 23:34:05 2021 +0100

    Hive: Update Hive write path for Tez (#2163)
    
    Co-authored-by: Marton Bod <[email protected]>
---
 .../mr/hive/HiveIcebergOutputCommitter.java        |  26 +++--
 .../iceberg/mr/hive/HiveIcebergOutputFormat.java   |   4 +-
 .../java/org/apache/iceberg/mr/hive/TezUtil.java   | 121 +++++++++++++++++++++
 .../mr/hive/TestHiveIcebergOutputCommitter.java    |   5 +-
 .../TestHiveIcebergStorageHandlerWithEngine.java   |   6 +
 5 files changed, 148 insertions(+), 14 deletions(-)

diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index eed0fdc..47cbf7d 100644
--- 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++ 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -82,11 +82,13 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
 
   /**
    * Collects the generated data files and creates a commit file storing the 
data file list.
-   * @param context The job context
+   * @param originalContext The task attempt context
    * @throws IOException Thrown if there is an error writing the commit file
    */
   @Override
-  public void commitTask(TaskAttemptContext context) throws IOException {
+  public void commitTask(TaskAttemptContext originalContext) throws 
IOException {
+    TaskAttemptContext context = 
TezUtil.enrichContextWithAttemptWrapper(originalContext);
+
     TaskAttemptID attemptID = context.getTaskAttemptID();
     String fileForCommitLocation = 
generateFileForCommitLocation(context.getJobConf(),
         attemptID.getJobID(), attemptID.getTaskID().getId());
@@ -108,11 +110,13 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
 
   /**
    * Removes files generated by this task.
-   * @param context The task context
+   * @param originalContext The task attempt context
    * @throws IOException Thrown if there is an error closing the writer
    */
   @Override
-  public void abortTask(TaskAttemptContext context) throws IOException {
+  public void abortTask(TaskAttemptContext originalContext) throws IOException 
{
+    TaskAttemptContext context = 
TezUtil.enrichContextWithAttemptWrapper(originalContext);
+
     // Clean up writer data from the local store
     HiveIcebergRecordWriter writer = 
HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
 
@@ -125,11 +129,13 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
   /**
    * Reads the commit files stored in the temp directory and collects the 
generated committed data files.
    * Appends the data files to the table. At the end removes the temporary 
directory.
-   * @param jobContext The job context
+   * @param originalContext The job context
    * @throws IOException if there is a failure deleting the files
    */
   @Override
-  public void commitJob(JobContext jobContext) throws IOException {
+  public void commitJob(JobContext originalContext) throws IOException {
+    JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+
     JobConf conf = jobContext.getJobConf();
     Table table = Catalogs.loadTable(conf);
 
@@ -158,12 +164,14 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
   /**
    * Removes the generated data files, if there is a commit file already 
generated for them.
    * The cleanup at the end removes the temporary directory as well.
-   * @param jobContext The job context
+   * @param originalContext The job context
    * @param status The status of the job
    * @throws IOException if there is a failure deleting the files
    */
   @Override
-  public void abortJob(JobContext jobContext, int status) throws IOException {
+  public void abortJob(JobContext originalContext, int status) throws 
IOException {
+    JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+
     String location = generateJobLocation(jobContext.getJobConf(), 
jobContext.getJobID());
     LOG.info("Job {} is aborted. Cleaning job location {}", 
jobContext.getJobID(), location);
 
@@ -215,7 +223,7 @@ public class HiveIcebergOutputCommitter extends 
OutputCommitter {
     JobConf conf = jobContext.getJobConf();
     // If there are reducers, then every reducer will generate a result file.
     // If this is a map only task, then every mapper will generate a result 
file.
-    int expectedFiles = conf.getNumReduceTasks() != 0 ? 
conf.getNumReduceTasks() : conf.getNumMapTasks();
+    int expectedFiles = conf.getNumReduceTasks() > 0 ? 
conf.getNumReduceTasks() : conf.getNumMapTasks();
 
     ExecutorService executor = null;
     try {
diff --git 
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 823490e..dc592ce 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -45,8 +45,6 @@ import org.apache.iceberg.mr.mapred.Container;
 public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable, 
Container<Record>>,
     HiveOutputFormat<NullWritable, Container<Record>> {
 
-  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
-
   @Override
   public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path 
finalOutPath, Class valueClass,
       boolean isCompressed, Properties tableAndSerDeProperties, Progressable 
progress) {
@@ -65,7 +63,7 @@ public class HiveIcebergOutputFormat<T> implements 
OutputFormat<NullWritable, Co
   }
 
   private static HiveIcebergRecordWriter writer(JobConf jc) {
-    TaskAttemptID taskAttemptID = 
TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY));
+    TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
     Schema schema = HiveIcebergStorageHandler.schema(jc);
     PartitionSpec spec = HiveIcebergStorageHandler.spec(jc);
     FileFormat fileFormat = 
FileFormat.valueOf(jc.get(InputFormatConfig.WRITE_FILE_FORMAT));
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java 
b/mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java
new file mode 100644
index 0000000..a69d183
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Objects;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobID;
+
+public class TezUtil {
+
+  private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+  // TezProcessor (Hive) propagates the vertex id under this key - available 
during Task commit phase
+  private static final String TEZ_VERTEX_ID_HIVE = "hive.tez.vertex.index";
+  // MROutputCommitter (Tez) propagates the vertex id under this key - 
available during DAG/Vertex commit phase
+  private static final String TEZ_VERTEX_ID_DAG = "mapreduce.task.vertex.id";
+
+  /**
+   * If the Tez vertex id is present in config, creates a new jobContext by 
appending the Tez vertex id to the jobID.
+   * For the rationale behind this enrichment, please refer to point #1 in the 
docs of {@link TaskAttemptWrapper}.
+   * @param jobContext original jobContext to be enriched
+   * @return enriched jobContext
+   */
+  public static JobContext enrichContextWithVertexId(JobContext jobContext) {
+    String vertexId = jobContext.getJobConf().get(TEZ_VERTEX_ID_DAG);
+    if (vertexId != null) {
+      JobID jobID = getJobIDWithVertexAppended(jobContext.getJobID(), 
vertexId);
+      return new JobContextImpl(jobContext.getJobConf(), jobID, 
jobContext.getProgressible());
+    } else {
+      return jobContext;
+    }
+  }
+
+  /**
+   * Creates a new taskAttemptContext by replacing the taskAttemptID with a 
wrapped object.
+   * For the rationale behind this enrichment, please refer to point #2 in the 
docs of {@link TaskAttemptWrapper}.
+   * @param taskAttemptContext original taskAttemptContext to be enriched
+   * @return enriched taskAttemptContext
+   */
+  public static TaskAttemptContext 
enrichContextWithAttemptWrapper(TaskAttemptContext taskAttemptContext) {
+    TaskAttemptID wrapped = 
TezUtil.taskAttemptWrapper(taskAttemptContext.getTaskAttemptID());
+    return new TaskAttemptContextImpl(taskAttemptContext.getJobConf(), 
wrapped);
+  }
+
+  public static TaskAttemptID taskAttemptWrapper(TaskAttemptID attemptID) {
+    return new TaskAttemptWrapper(attemptID, "");
+  }
+
+  public static TaskAttemptID taskAttemptWrapper(JobConf jc) {
+    return new 
TaskAttemptWrapper(TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY)), 
jc.get(TEZ_VERTEX_ID_HIVE));
+  }
+
+  private static JobID getJobIDWithVertexAppended(JobID jobID, String 
vertexId) {
+    if (vertexId != null && !vertexId.isEmpty()) {
+      return new JobID(jobID.getJtIdentifier() + vertexId, jobID.getId());
+    } else {
+      return jobID;
+    }
+  }
+
+  private TezUtil() {
+  }
+
+  /**
+   * Subclasses {@link org.apache.hadoop.mapred.TaskAttemptID}. It has two 
main purposes:
+   * 1. Provide a way to append an optional vertex id to the Job ID. This is 
needed because there is a discrepancy
+   * between how the attempt ID is constructed in the {@link 
org.apache.tez.mapreduce.output.MROutput} (with vertex ID
+   * appended to the end of the Job ID) and how it's available in the mapper 
(without vertex ID) which creates and
+   * caches the HiveIcebergRecordWriter object.
+   * 2. Redefine the equals/hashcode provided by TaskAttemptID so that task 
type (map or reduce) does not count, and
+   * therefore the mapper and reducer threads can use the same attempt 
ID-based key to retrieve the cached
+   * HiveIcebergRecordWriter object.
+   */
+  private static class TaskAttemptWrapper extends TaskAttemptID {
+
+    TaskAttemptWrapper(TaskAttemptID attemptID, String vertexId) {
+      super(getJobIDWithVertexAppended(attemptID.getJobID(), 
vertexId).getJtIdentifier(),
+          attemptID.getJobID().getId(), attemptID.getTaskType(), 
attemptID.getTaskID().getId(), attemptID.getId());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      TaskAttemptWrapper that = (TaskAttemptWrapper) o;
+      return getId() == that.getId() &&
+          getTaskID().getId() == that.getTaskID().getId() &&
+          Objects.equals(getJobID(), that.getJobID());
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(getId(), getTaskID().getId(), getJobID());
+    }
+  }
+}
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 73f9786..d6202f9 100644
--- 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -205,7 +205,7 @@ public class TestHiveIcebergOutputCommitter {
     }
 
     Assert.assertEquals(1, argumentCaptor.getAllValues().size());
-    TaskAttemptID capturedId = argumentCaptor.getValue().getTaskAttemptID();
+    TaskAttemptID capturedId = 
TezUtil.taskAttemptWrapper(argumentCaptor.getValue().getTaskAttemptID());
     // writer is still in the map after commitTask failure
     Assert.assertNotNull(getWriter(capturedId));
     failingCommitter.abortTask(new TaskAttemptContextImpl(conf, capturedId));
@@ -264,7 +264,8 @@ public class TestHiveIcebergOutputCommitter {
           new OutputFileFactory(spec, FileFormat.PARQUET, location, io, 
encryption, taskId.getTaskID().getId(),
               attemptNum, QUERY_ID + "-" + JOB_ID);
       HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema, 
spec, FileFormat.PARQUET,
-          new GenericAppenderFactory(schema), outputFileFactory, io, 
TARGET_FILE_SIZE, taskId);
+          new GenericAppenderFactory(schema), outputFileFactory, io, 
TARGET_FILE_SIZE,
+          TezUtil.taskAttemptWrapper(taskId));
 
       Container<Record> container = new Container<>();
 
diff --git 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 8ff3786..f74039b 100644
--- 
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++ 
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.iceberg.FileFormat;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
@@ -155,6 +156,11 @@ public class TestHiveIcebergStorageHandlerWithEngine {
     // HiveServer2 thread pools are using thread local Hive -> HMSClient 
objects. These are not cleaned up when the
     // HiveServer2 is stopped. Only Finalizer closes the HMS connections.
     System.gc();
+    // Mixing mr and tez jobs within the same JVM can cause problems. Mr jobs 
set the ExecMapper status to done=false
+    // at the beginning and to done=true at the end. However, tez jobs also 
rely on this value to see if they should
+    // proceed, but they do not reset it to done=false at the beginning. 
Therefore, without calling this after each test
+    // case, any tez job that follows a completed mr job will erroneously read 
done=true and will not proceed.
+    ExecMapper.setDone(false);
   }
 
   @Test

Reply via email to