This is an automated email from the ASF dual-hosted git repository.
iwasakims pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/bigtop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 29d3ad50a BIGTOP-4022. Fix build failure of Hive against Tez 0.10.2.
(#1196)
29d3ad50a is described below
commit 29d3ad50a79d52f0889b40b5a08deadf610a7720
Author: Kengo Seki <[email protected]>
AuthorDate: Wed Oct 18 13:26:54 2023 +0900
BIGTOP-4022. Fix build failure of Hive against Tez 0.10.2. (#1196)
(cherry picked from commit 157e19e87d6881f295385947409f1ce9c687f075)
---
.../src/common/hive/patch1-HIVE-23190.diff | 43 ----
.../src/common/hive/patch1-HIVE-27336.diff | 258 +++++++++++++++++++++
2 files changed, 258 insertions(+), 43 deletions(-)
diff --git a/bigtop-packages/src/common/hive/patch1-HIVE-23190.diff
b/bigtop-packages/src/common/hive/patch1-HIVE-23190.diff
deleted file mode 100644
index 0eea57de0..000000000
--- a/bigtop-packages/src/common/hive/patch1-HIVE-23190.diff
+++ /dev/null
@@ -1,43 +0,0 @@
-diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
-index 4de03f232d..d8c18776ec 100644
----
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
-+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
-@@ -25,6 +25,7 @@
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.tez.runtime.library.common.Constants;
- import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-@@ -43,10 +44,21 @@
- private final LinkedBlockingQueue<String> queue =
- new LinkedBlockingQueue<String>();
-
-+ private FileSystem fs;
-+
- public IndexCache(Configuration conf) {
- this.conf = conf;
- totalMemoryAllowed = 10 * 1024 * 1024;
- LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
-+ initLocalFs();
-+ }
-+
-+ private void initLocalFs() {
-+ try {
-+ this.fs = FileSystem.getLocal(conf).getRaw();
-+ } catch (IOException e) {
-+ throw new RuntimeException(e);
-+ }
- }
-
- /**
-@@ -118,7 +130,7 @@ private IndexInformation readIndexFileToCache(Path
indexFileName,
- LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
- TezSpillRecord tmp = null;
- try {
-- tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
-+ tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
- } catch (Throwable e) {
- tmp = new TezSpillRecord(0);
- cache.remove(mapId);
diff --git a/bigtop-packages/src/common/hive/patch1-HIVE-27336.diff
b/bigtop-packages/src/common/hive/patch1-HIVE-27336.diff
new file mode 100644
index 000000000..2f9030595
--- /dev/null
+++ b/bigtop-packages/src/common/hive/patch1-HIVE-27336.diff
@@ -0,0 +1,258 @@
+From 5f79f2a059534eaa63c0479a2142a250fa78c1e3 Mon Sep 17 00:00:00 2001
+From: amaruthappan <[email protected]>
+Date: Mon, 8 May 2023 17:00:46 -0700
+Subject: [PATCH 1/2] =?UTF-8?q?HIVE-23190:=20LLAP:=20modify=20IndexCache?=
+ =?UTF-8?q?=20to=20pass=20filesystem=20object=20to=20TezSpillRecord=20(L?=
+ =?UTF-8?q?=C3=A1szl=C3=B3=20Bodor=20reviewed=20by=20Rajesh=20Balamohan)?=
+MIME-Version: 1.0
+Content-Type: text/plain; charset=UTF-8
+Content-Transfer-Encoding: 8bit
+
+---
+ .../hadoop/hive/llap/shufflehandler/IndexCache.java | 11 +++++++++++
+ 1 file changed, 11 insertions(+)
+
+diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+index 4de03f232d70..c7b986469f4a 100644
+---
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
++++
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+@@ -25,6 +25,7 @@
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.tez.runtime.library.common.Constants;
+ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+@@ -42,11 +43,21 @@ class IndexCache {
+
+ private final LinkedBlockingQueue<String> queue =
+ new LinkedBlockingQueue<String>();
++ private FileSystem fs;
+
+ public IndexCache(Configuration conf) {
+ this.conf = conf;
+ totalMemoryAllowed = 10 * 1024 * 1024;
+ LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
++ initLocalFs();
++ }
++
++ private void initLocalFs() {
++ try {
++ this.fs = FileSystem.getLocal(conf).getRaw();
++ } catch (IOException e) {
++ throw new RuntimeException(e);
++ }
+ }
+
+ /**
+
+From 1edbe403ff424f91ed0cd1ae91eb39290b5beb7f Mon Sep 17 00:00:00 2001
+From: amaruthappan <[email protected]>
+Date: Mon, 8 May 2023 16:58:51 -0700
+Subject: [PATCH 2/2] HIVE-27336: Upgrade Tez to 0.10.2 in Hive-3.X
+
+---
+ .../hive/llap/daemon/impl/ContainerRunnerImpl.java | 4 ++--
+ .../hadoop/hive/llap/daemon/impl/LlapTaskReporter.java | 2 +-
+ .../hive/llap/daemon/impl/TaskRunnerCallable.java | 2 +-
+ .../hadoop/hive/llap/shufflehandler/IndexCache.java | 2 +-
+ .../hive/llap/tezplugins/LlapTaskCommunicator.java | 10 +++++-----
+ .../hive/llap/tezplugins/LlapTaskSchedulerService.java | 8 ++++----
+ .../hive/llap/tezplugins/TestLlapTaskCommunicator.java | 1 +
+ pom.xml | 2 +-
+ .../org/apache/hadoop/hive/ql/exec/tez/TezTask.java | 7 +++++++
+ 9 files changed, 23 insertions(+), 15 deletions(-)
+
+diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+index ef5922ef41b6..95d601a8e2f3 100644
+---
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
++++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+@@ -215,7 +215,7 @@ public SubmitWorkResponseProto
submitWork(SubmitWorkRequestProto request) throws
+ vertex.getVertexName(), request.getFragmentNumber(),
request.getAttemptNumber());
+
+ // This is the start of container-annotated logging.
+- final String dagId =
attemptId.getTaskID().getVertexID().getDAGId().toString();
++ final String dagId = attemptId.getDAGID().toString();
+ final String queryId = vertex.getHiveQueryId();
+ final String fragmentId =
LlapTezUtils.stripAttemptPrefix(fragmentIdString);
+ MDC.put("dagId", dagId);
+@@ -237,7 +237,7 @@ public SubmitWorkResponseProto
submitWork(SubmitWorkRequestProto request) throws
+ env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser());
+
+ TezTaskAttemptID taskAttemptId =
TezTaskAttemptID.fromString(fragmentIdString);
+- int dagIdentifier =
taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
++ int dagIdentifier = taskAttemptId.getDAGID().getId();
+
+ QueryIdentifier queryIdentifier = new QueryIdentifier(
+ qIdProto.getApplicationIdString(), dagIdentifier);
+diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+index 33ade55ee1f5..cc7879cdecea 100644
+---
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
++++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+@@ -291,7 +291,7 @@ private synchronized ResponseWrapper
heartbeat(Collection<TezEvent> eventsArg) t
+ int fromPreRoutedEventId = task.getNextPreRoutedEventId();
+ int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
+ TezHeartbeatRequest request = new TezHeartbeatRequest(requestId,
events, fromPreRoutedEventId,
+- containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
++ containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, 0);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending heartbeat to AM, request=" + request);
+ }
+diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+index 7f436e23264b..66f7c330f786 100644
+---
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
++++
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+@@ -327,7 +327,7 @@ private String constructThreadNameSuffix(TezTaskAttemptID
taskAttemptId) {
+ StringBuilder sb = new StringBuilder();
+ TezTaskID taskId = taskAttemptId.getTaskID();
+ TezVertexID vertexId = taskId.getVertexID();
+- TezDAGID dagId = vertexId.getDAGId();
++ TezDAGID dagId = vertexId.getDAGID();
+ ApplicationId appId = dagId.getApplicationId();
+ long clusterTs = appId.getClusterTimestamp();
+ long clusterTsShort = clusterTs % 1_000_000L;
+diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+index c7b986469f4a..cc5019a64d84 100644
+---
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
++++
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+@@ -129,7 +129,7 @@ private IndexInformation readIndexFileToCache(Path
indexFileName,
+ LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
+ TezSpillRecord tmp = null;
+ try {
+- tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
++ tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
+ } catch (Throwable e) {
+ tmp = new TezSpillRecord(0);
+ cache.remove(mapId);
+diff --git
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+index 5d4ce223d9e9..5eebe10ac9a3 100644
+---
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
++++
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+@@ -322,7 +322,7 @@ public <T> void startUpdateGuaranteed(TezTaskAttemptID
attemptId, NodeInfo assig
+ UpdateFragmentRequestProto request =
UpdateFragmentRequestProto.newBuilder()
+
.setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString())
+ .setQueryIdentifier(constructQueryIdentifierProto(
+- attemptId.getTaskID().getVertexID().getDAGId().getId())).build();
++ attemptId.getDAGID().getId())).build();
+
+ communicator.sendUpdateFragment(request, nodeId.getHostname(),
nodeId.getPort(),
+ new
LlapProtocolClientProxy.ExecuteRequestCallback<UpdateFragmentResponseProto>() {
+@@ -349,7 +349,7 @@ public void registerRunningTaskAttempt(final ContainerId
containerId, final Task
+ int priority) {
+ super.registerRunningTaskAttempt(containerId, taskSpec,
additionalResources, credentials,
+ credentialsChanged, priority);
+- int dagId =
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
++ int dagId = taskSpec.getTaskAttemptID().getDAGID().getId();
+ if (currentQueryIdentifierProto == null || (dagId !=
currentQueryIdentifierProto.getDagIndex())) {
+ // TODO HiveQueryId extraction by parsing the Processor payload is
ugly. This can be improved
+ // once TEZ-2672 is fixed.
+@@ -505,7 +505,7 @@ private void sendTaskTerminated(final TezTaskAttemptID
taskAttemptId,
+ TerminateFragmentRequestProto request =
+ TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(
+ constructQueryIdentifierProto(
+- taskAttemptId.getTaskID().getVertexID().getDAGId().getId()))
++ taskAttemptId.getDAGID().getId()))
+ .setFragmentIdentifierString(taskAttemptId.toString()).build();
+ communicator.sendTerminateFragment(request, nodeId.getHostname(),
nodeId.getPort(),
+ new
LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>()
{
+@@ -649,7 +649,7 @@ private String constructLogUrl(final TezTaskAttemptID
attemptID, final NodeId co
+
+ private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final
String containerIdString,
+ final boolean isDone, final String nmAddress) {
+- String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
++ String dagId = attemptID.getDAGID().toString();
+ String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log",
(isDone ? ".done" : ""),
+ "?nm.id=", nmAddress);
+ String url = PATH_JOINER.join(timelineServerUri, "ws", "v1",
"applicationhistory", "containers",
+@@ -794,7 +794,7 @@ private SubmitWorkRequestProto
constructSubmitWorkRequest(ContainerId containerI
+ builder.setAmPort(getAddress().getPort());
+
+ Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
+-
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
++ taskSpec.getTaskAttemptID().getDAGID().getId());
+ ByteBuffer credentialsBinary =
credentialMap.get(currentQueryIdentifierProto);
+ if (credentialsBinary == null) {
+ credentialsBinary =
serializeCredentials(getContext().getCurrentDagInfo().getCredentials());
+diff --git
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+index 82179645da00..99038cd49542 100644
+---
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
++++
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+@@ -1075,7 +1075,7 @@ public void allocateTask(Object task, Resource
capability, String[] hosts, Strin
+ writeLock.lock();
+ try {
+ if (!dagRunning && metrics != null && id != null) {
+- metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
++ metrics.setDagId(id.getDAGID().toString());
+ }
+ dagRunning = true;
+ dagStats.registerTaskRequest(hosts, racks);
+@@ -1099,7 +1099,7 @@ public void allocateTask(Object task, Resource
capability, ContainerId container
+ writeLock.lock();
+ try {
+ if (!dagRunning && metrics != null && id != null) {
+- metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
++ metrics.setDagId(id.getDAGID().toString());
+ }
+ dagRunning = true;
+ dagStats.registerTaskRequest(null, null);
+@@ -1114,7 +1114,7 @@ public void allocateTask(Object task, Resource
capability, ContainerId container
+ protected TezTaskAttemptID getTaskAttemptId(Object task) {
+ // TODO: why does Tez API use "Object" for this?
+ if (task instanceof TaskAttempt) {
+- return ((TaskAttempt)task).getID();
++ return ((TaskAttempt)task).getTaskAttemptID();
+ }
+ throw new AssertionError("LLAP plugin can only schedule task attempts");
+ }
+@@ -2030,7 +2030,7 @@ private List<TaskInfo>
preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> r
+ continue; // Not the right host.
+ }
+ Map<Integer,Set<Integer>> depInfo = getDependencyInfo(
+- taskInfo.attemptId.getTaskID().getVertexID().getDAGId());
++ taskInfo.attemptId.getDAGID());
+ Set<Integer> vertexDepInfo = null;
+ if (depInfo != null) {
+ vertexDepInfo = depInfo.get(forVertex);
+diff --git
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+index 5efe7c677ce6..2fa2487a74d7 100644
+---
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
++++
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+@@ -364,6 +364,7 @@ private TaskSpec createBaseTaskSpec(String vertexName,
TezVertexID vertexId, int
+ TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(
+ TezTaskID.getInstance(vertexId, taskIdx), 0);
+ doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
++ doReturn(taskAttemptId.getDAGID()).when(taskSpec).getDAGID();
+ doReturn(DAG_NAME).when(taskSpec).getDAGName();
+ doReturn(vertexName).when(taskSpec).getVertexName();
+ ProcessorDescriptor processorDescriptor =
ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload);
+diff --git a/pom.xml b/pom.xml
+index cb54806ef5ca..053ccc059f3f 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -196,7 +196,7 @@
+ <slf4j.version>1.7.10</slf4j.version>
+ <ST4.version>4.0.4</ST4.version>
+ <storage-api.version>2.7.0</storage-api.version>
+- <tez.version>0.9.1</tez.version>
++ <tez.version>0.10.2</tez.version>
+ <super-csv.version>2.2.0</super-csv.version>
+ <spark.version>2.3.0</spark.version>
+ <scala.binary.version>2.11</scala.binary.version>
+diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+index a15482f19c43..288341a2b229 100644
+--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
++++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+@@ -761,5 +761,12 @@ public DAGStatus
waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts>
+ return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts);
+ }
+ }
++
++ @Override
++ public String getWebUIAddress() throws IOException, TezException {
++ synchronized (dagClient) {
++ return dagClient.getWebUIAddress();
++ }
++ }
+ }
+ }