This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3ab15bf Hive: Update Hive write path for Tez (#2163)
3ab15bf is described below
commit 3ab15bf15c22f5d11dee6b2126b2e72bd99a4b3d
Author: Marton Bod <[email protected]>
AuthorDate: Thu Feb 4 23:34:05 2021 +0100
Hive: Update Hive write path for Tez (#2163)
Co-authored-by: Marton Bod <[email protected]>
---
.../mr/hive/HiveIcebergOutputCommitter.java | 26 +++--
.../iceberg/mr/hive/HiveIcebergOutputFormat.java | 4 +-
.../java/org/apache/iceberg/mr/hive/TezUtil.java | 121 +++++++++++++++++++++
.../mr/hive/TestHiveIcebergOutputCommitter.java | 5 +-
.../TestHiveIcebergStorageHandlerWithEngine.java | 6 +
5 files changed, 148 insertions(+), 14 deletions(-)
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
index eed0fdc..47cbf7d 100644
---
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
+++
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java
@@ -82,11 +82,13 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
/**
* Collects the generated data files and creates a commit file storing the
data file list.
- * @param context The job context
+ * @param originalContext The task attempt context
* @throws IOException Thrown if there is an error writing the commit file
*/
@Override
- public void commitTask(TaskAttemptContext context) throws IOException {
+ public void commitTask(TaskAttemptContext originalContext) throws
IOException {
+ TaskAttemptContext context =
TezUtil.enrichContextWithAttemptWrapper(originalContext);
+
TaskAttemptID attemptID = context.getTaskAttemptID();
String fileForCommitLocation =
generateFileForCommitLocation(context.getJobConf(),
attemptID.getJobID(), attemptID.getTaskID().getId());
@@ -108,11 +110,13 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
/**
* Removes files generated by this task.
- * @param context The task context
+ * @param originalContext The task attempt context
* @throws IOException Thrown if there is an error closing the writer
*/
@Override
- public void abortTask(TaskAttemptContext context) throws IOException {
+ public void abortTask(TaskAttemptContext originalContext) throws IOException
{
+ TaskAttemptContext context =
TezUtil.enrichContextWithAttemptWrapper(originalContext);
+
// Clean up writer data from the local store
HiveIcebergRecordWriter writer =
HiveIcebergRecordWriter.removeWriter(context.getTaskAttemptID());
@@ -125,11 +129,13 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
/**
* Reads the commit files stored in the temp directory and collects the
generated committed data files.
* Appends the data files to the table. At the end removes the temporary
directory.
- * @param jobContext The job context
+ * @param originalContext The job context
* @throws IOException if there is a failure deleting the files
*/
@Override
- public void commitJob(JobContext jobContext) throws IOException {
+ public void commitJob(JobContext originalContext) throws IOException {
+ JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+
JobConf conf = jobContext.getJobConf();
Table table = Catalogs.loadTable(conf);
@@ -158,12 +164,14 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
/**
* Removes the generated data files, if there is a commit file already
generated for them.
* The cleanup at the end removes the temporary directory as well.
- * @param jobContext The job context
+ * @param originalContext The job context
* @param status The status of the job
* @throws IOException if there is a failure deleting the files
*/
@Override
- public void abortJob(JobContext jobContext, int status) throws IOException {
+ public void abortJob(JobContext originalContext, int status) throws
IOException {
+ JobContext jobContext = TezUtil.enrichContextWithVertexId(originalContext);
+
String location = generateJobLocation(jobContext.getJobConf(),
jobContext.getJobID());
LOG.info("Job {} is aborted. Cleaning job location {}",
jobContext.getJobID(), location);
@@ -215,7 +223,7 @@ public class HiveIcebergOutputCommitter extends
OutputCommitter {
JobConf conf = jobContext.getJobConf();
// If there are reducers, then every reducer will generate a result file.
// If this is a map only task, then every mapper will generate a result
file.
- int expectedFiles = conf.getNumReduceTasks() != 0 ?
conf.getNumReduceTasks() : conf.getNumMapTasks();
+ int expectedFiles = conf.getNumReduceTasks() > 0 ?
conf.getNumReduceTasks() : conf.getNumMapTasks();
ExecutorService executor = null;
try {
diff --git
a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
index 823490e..dc592ce 100644
--- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputFormat.java
@@ -45,8 +45,6 @@ import org.apache.iceberg.mr.mapred.Container;
public class HiveIcebergOutputFormat<T> implements OutputFormat<NullWritable,
Container<Record>>,
HiveOutputFormat<NullWritable, Container<Record>> {
- private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
-
@Override
public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path
finalOutPath, Class valueClass,
boolean isCompressed, Properties tableAndSerDeProperties, Progressable
progress) {
@@ -65,7 +63,7 @@ public class HiveIcebergOutputFormat<T> implements
OutputFormat<NullWritable, Co
}
private static HiveIcebergRecordWriter writer(JobConf jc) {
- TaskAttemptID taskAttemptID =
TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY));
+ TaskAttemptID taskAttemptID = TezUtil.taskAttemptWrapper(jc);
Schema schema = HiveIcebergStorageHandler.schema(jc);
PartitionSpec spec = HiveIcebergStorageHandler.spec(jc);
FileFormat fileFormat =
FileFormat.valueOf(jc.get(InputFormatConfig.WRITE_FILE_FORMAT));
diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java
b/mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java
new file mode 100644
index 0000000..a69d183
--- /dev/null
+++ b/mr/src/main/java/org/apache/iceberg/mr/hive/TezUtil.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.mr.hive;
+
+import java.util.Objects;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.JobID;
+
+public class TezUtil {
+
+ private static final String TASK_ATTEMPT_ID_KEY = "mapred.task.id";
+ // TezProcessor (Hive) propagates the vertex id under this key - available
during Task commit phase
+ private static final String TEZ_VERTEX_ID_HIVE = "hive.tez.vertex.index";
+ // MROutputCommitter (Tez) propagates the vertex id under this key -
available during DAG/Vertex commit phase
+ private static final String TEZ_VERTEX_ID_DAG = "mapreduce.task.vertex.id";
+
+ /**
+ * If the Tez vertex id is present in config, creates a new jobContext by
appending the Tez vertex id to the jobID.
+ * For the rationale behind this enrichment, please refer to point #1 in the
docs of {@link TaskAttemptWrapper}.
+ * @param jobContext original jobContext to be enriched
+ * @return enriched jobContext
+ */
+ public static JobContext enrichContextWithVertexId(JobContext jobContext) {
+ String vertexId = jobContext.getJobConf().get(TEZ_VERTEX_ID_DAG);
+ if (vertexId != null) {
+ JobID jobID = getJobIDWithVertexAppended(jobContext.getJobID(),
vertexId);
+ return new JobContextImpl(jobContext.getJobConf(), jobID,
jobContext.getProgressible());
+ } else {
+ return jobContext;
+ }
+ }
+
+ /**
+ * Creates a new taskAttemptContext by replacing the taskAttemptID with a
wrapped object.
+ * For the rationale behind this enrichment, please refer to point #2 in the
docs of {@link TaskAttemptWrapper}.
+ * @param taskAttemptContext original taskAttemptContext to be enriched
+ * @return enriched taskAttemptContext
+ */
+ public static TaskAttemptContext
enrichContextWithAttemptWrapper(TaskAttemptContext taskAttemptContext) {
+ TaskAttemptID wrapped =
TezUtil.taskAttemptWrapper(taskAttemptContext.getTaskAttemptID());
+ return new TaskAttemptContextImpl(taskAttemptContext.getJobConf(),
wrapped);
+ }
+
+ public static TaskAttemptID taskAttemptWrapper(TaskAttemptID attemptID) {
+ return new TaskAttemptWrapper(attemptID, "");
+ }
+
+ public static TaskAttemptID taskAttemptWrapper(JobConf jc) {
+ return new
TaskAttemptWrapper(TaskAttemptID.forName(jc.get(TASK_ATTEMPT_ID_KEY)),
jc.get(TEZ_VERTEX_ID_HIVE));
+ }
+
+ private static JobID getJobIDWithVertexAppended(JobID jobID, String
vertexId) {
+ if (vertexId != null && !vertexId.isEmpty()) {
+ return new JobID(jobID.getJtIdentifier() + vertexId, jobID.getId());
+ } else {
+ return jobID;
+ }
+ }
+
+ private TezUtil() {
+ }
+
+ /**
+ * Subclasses {@link org.apache.hadoop.mapred.TaskAttemptID}. It has two
main purposes:
+ * 1. Provide a way to append an optional vertex id to the Job ID. This is
needed because there is a discrepancy
+ * between how the attempt ID is constructed in the {@link
org.apache.tez.mapreduce.output.MROutput} (with vertex ID
+ * appended to the end of the Job ID) and how it's available in the mapper
(without vertex ID) which creates and
+ * caches the HiveIcebergRecordWriter object.
+ * 2. Redefine the equals/hashcode provided by TaskAttemptID so that task
type (map or reduce) does not count, and
+ * therefore the mapper and reducer threads can use the same attempt
ID-based key to retrieve the cached
+ * HiveIcebergRecordWriter object.
+ */
+ private static class TaskAttemptWrapper extends TaskAttemptID {
+
+ TaskAttemptWrapper(TaskAttemptID attemptID, String vertexId) {
+ super(getJobIDWithVertexAppended(attemptID.getJobID(),
vertexId).getJtIdentifier(),
+ attemptID.getJobID().getId(), attemptID.getTaskType(),
attemptID.getTaskID().getId(), attemptID.getId());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskAttemptWrapper that = (TaskAttemptWrapper) o;
+ return getId() == that.getId() &&
+ getTaskID().getId() == that.getTaskID().getId() &&
+ Objects.equals(getJobID(), that.getJobID());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getId(), getTaskID().getId(), getJobID());
+ }
+ }
+}
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
index 73f9786..d6202f9 100644
---
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergOutputCommitter.java
@@ -205,7 +205,7 @@ public class TestHiveIcebergOutputCommitter {
}
Assert.assertEquals(1, argumentCaptor.getAllValues().size());
- TaskAttemptID capturedId = argumentCaptor.getValue().getTaskAttemptID();
+ TaskAttemptID capturedId =
TezUtil.taskAttemptWrapper(argumentCaptor.getValue().getTaskAttemptID());
// writer is still in the map after commitTask failure
Assert.assertNotNull(getWriter(capturedId));
failingCommitter.abortTask(new TaskAttemptContextImpl(conf, capturedId));
@@ -264,7 +264,8 @@ public class TestHiveIcebergOutputCommitter {
new OutputFileFactory(spec, FileFormat.PARQUET, location, io,
encryption, taskId.getTaskID().getId(),
attemptNum, QUERY_ID + "-" + JOB_ID);
HiveIcebergRecordWriter testWriter = new HiveIcebergRecordWriter(schema,
spec, FileFormat.PARQUET,
- new GenericAppenderFactory(schema), outputFileFactory, io,
TARGET_FILE_SIZE, taskId);
+ new GenericAppenderFactory(schema), outputFileFactory, io,
TARGET_FILE_SIZE,
+ TezUtil.taskAttemptWrapper(taskId));
Container<Record> container = new Container<>();
diff --git
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
index 8ff3786..f74039b 100644
---
a/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
+++
b/mr/src/test/java/org/apache/iceberg/mr/hive/TestHiveIcebergStorageHandlerWithEngine.java
@@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
@@ -155,6 +156,11 @@ public class TestHiveIcebergStorageHandlerWithEngine {
// HiveServer2 thread pools are using thread local Hive -> HMSClient
objects. These are not cleaned up when the
// HiveServer2 is stopped. Only Finalizer closes the HMS connections.
System.gc();
+ // Mixing mr and tez jobs within the same JVM can cause problems. Mr jobs
set the ExecMapper status to done=false
+ // at the beginning and to done=true at the end. However, tez jobs also
rely on this value to see if they should
+ // proceed, but they do not reset it to done=false at the beginning.
Therefore, without calling this after each test
+ // case, any tez job that follows a completed mr job will erroneously read
done=true and will not proceed.
+ ExecMapper.setDone(false);
}
@Test