This is an automated email from the ASF dual-hosted git repository.

iwasakims pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/bigtop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 29d3ad50a BIGTOP-4022. Fix build failure of Hive against Tez 0.10.2. 
(#1196)
29d3ad50a is described below

commit 29d3ad50a79d52f0889b40b5a08deadf610a7720
Author: Kengo Seki <[email protected]>
AuthorDate: Wed Oct 18 13:26:54 2023 +0900

    BIGTOP-4022. Fix build failure of Hive against Tez 0.10.2. (#1196)
    
    (cherry picked from commit 157e19e87d6881f295385947409f1ce9c687f075)
---
 .../src/common/hive/patch1-HIVE-23190.diff         |  43 ----
 .../src/common/hive/patch1-HIVE-27336.diff         | 258 +++++++++++++++++++++
 2 files changed, 258 insertions(+), 43 deletions(-)

diff --git a/bigtop-packages/src/common/hive/patch1-HIVE-23190.diff 
b/bigtop-packages/src/common/hive/patch1-HIVE-23190.diff
deleted file mode 100644
index 0eea57de0..000000000
--- a/bigtop-packages/src/common/hive/patch1-HIVE-23190.diff
+++ /dev/null
@@ -1,43 +0,0 @@
-diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
-index 4de03f232d..d8c18776ec 100644
---- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
-+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
-@@ -25,6 +25,7 @@
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.apache.hadoop.conf.Configuration;
-+import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.tez.runtime.library.common.Constants;
- import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-@@ -43,10 +44,21 @@
-   private final LinkedBlockingQueue<String> queue =
-       new LinkedBlockingQueue<String>();
- 
-+  private FileSystem fs;
-+
-   public IndexCache(Configuration conf) {
-     this.conf = conf;
-     totalMemoryAllowed = 10 * 1024 * 1024;
-     LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
-+    initLocalFs();
-+  }
-+
-+  private void initLocalFs() {
-+    try {
-+      this.fs = FileSystem.getLocal(conf).getRaw();
-+    } catch (IOException e) {
-+      throw new RuntimeException(e);
-+    }
-   }
- 
-   /**
-@@ -118,7 +130,7 @@ private IndexInformation readIndexFileToCache(Path 
indexFileName,
-     LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
-     TezSpillRecord tmp = null;
-     try {
--      tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
-+      tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
-     } catch (Throwable e) {
-       tmp = new TezSpillRecord(0);
-       cache.remove(mapId);
diff --git a/bigtop-packages/src/common/hive/patch1-HIVE-27336.diff 
b/bigtop-packages/src/common/hive/patch1-HIVE-27336.diff
new file mode 100644
index 000000000..2f9030595
--- /dev/null
+++ b/bigtop-packages/src/common/hive/patch1-HIVE-27336.diff
@@ -0,0 +1,258 @@
+From 5f79f2a059534eaa63c0479a2142a250fa78c1e3 Mon Sep 17 00:00:00 2001
+From: amaruthappan <[email protected]>
+Date: Mon, 8 May 2023 17:00:46 -0700
+Subject: [PATCH 1/2] =?UTF-8?q?HIVE-23190:=20LLAP:=20modify=20IndexCache?=
+ =?UTF-8?q?=20to=20pass=20filesystem=20object=20to=20TezSpillRecord=20(L?=
+ =?UTF-8?q?=C3=A1szl=C3=B3=20Bodor=20reviewed=20by=20Rajesh=20Balamohan)?=
+MIME-Version: 1.0
+Content-Type: text/plain; charset=UTF-8
+Content-Transfer-Encoding: 8bit
+
+---
+ .../hadoop/hive/llap/shufflehandler/IndexCache.java   | 11 +++++++++++
+ 1 file changed, 11 insertions(+)
+
+diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+index 4de03f232d70..c7b986469f4a 100644
+--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
++++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+@@ -25,6 +25,7 @@
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ import org.apache.hadoop.conf.Configuration;
++import org.apache.hadoop.fs.FileSystem;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.tez.runtime.library.common.Constants;
+ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+@@ -42,11 +43,21 @@ class IndexCache {
+ 
+   private final LinkedBlockingQueue<String> queue =
+       new LinkedBlockingQueue<String>();
++  private FileSystem fs;
+ 
+   public IndexCache(Configuration conf) {
+     this.conf = conf;
+     totalMemoryAllowed = 10 * 1024 * 1024;
+     LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
++    initLocalFs();
++  }
++
++  private void initLocalFs() {
++    try {
++      this.fs = FileSystem.getLocal(conf).getRaw();
++    } catch (IOException e) {
++      throw new RuntimeException(e);
++    }
+   }
+ 
+   /**
+
+From 1edbe403ff424f91ed0cd1ae91eb39290b5beb7f Mon Sep 17 00:00:00 2001
+From: amaruthappan <[email protected]>
+Date: Mon, 8 May 2023 16:58:51 -0700
+Subject: [PATCH 2/2] HIVE-27336: Upgrade Tez to 0.10.2 in Hive-3.X
+
+---
+ .../hive/llap/daemon/impl/ContainerRunnerImpl.java     |  4 ++--
+ .../hadoop/hive/llap/daemon/impl/LlapTaskReporter.java |  2 +-
+ .../hive/llap/daemon/impl/TaskRunnerCallable.java      |  2 +-
+ .../hadoop/hive/llap/shufflehandler/IndexCache.java    |  2 +-
+ .../hive/llap/tezplugins/LlapTaskCommunicator.java     | 10 +++++-----
+ .../hive/llap/tezplugins/LlapTaskSchedulerService.java |  8 ++++----
+ .../hive/llap/tezplugins/TestLlapTaskCommunicator.java |  1 +
+ pom.xml                                                |  2 +-
+ .../org/apache/hadoop/hive/ql/exec/tez/TezTask.java    |  7 +++++++
+ 9 files changed, 23 insertions(+), 15 deletions(-)
+
+diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+index ef5922ef41b6..95d601a8e2f3 100644
+--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
++++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+@@ -215,7 +215,7 @@ public SubmitWorkResponseProto 
submitWork(SubmitWorkRequestProto request) throws
+         vertex.getVertexName(), request.getFragmentNumber(), 
request.getAttemptNumber());
+ 
+     // This is the start of container-annotated logging.
+-    final String dagId = 
attemptId.getTaskID().getVertexID().getDAGId().toString();
++    final String dagId = attemptId.getDAGID().toString();
+     final String queryId = vertex.getHiveQueryId();
+     final String fragmentId = 
LlapTezUtils.stripAttemptPrefix(fragmentIdString);
+     MDC.put("dagId", dagId);
+@@ -237,7 +237,7 @@ public SubmitWorkResponseProto 
submitWork(SubmitWorkRequestProto request) throws
+       env.put(ApplicationConstants.Environment.USER.name(), vertex.getUser());
+ 
+       TezTaskAttemptID taskAttemptId = 
TezTaskAttemptID.fromString(fragmentIdString);
+-      int dagIdentifier = 
taskAttemptId.getTaskID().getVertexID().getDAGId().getId();
++      int dagIdentifier = taskAttemptId.getDAGID().getId();
+ 
+       QueryIdentifier queryIdentifier = new QueryIdentifier(
+           qIdProto.getApplicationIdString(), dagIdentifier);
+diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+index 33ade55ee1f5..cc7879cdecea 100644
+--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
++++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapTaskReporter.java
+@@ -291,7 +291,7 @@ private synchronized ResponseWrapper 
heartbeat(Collection<TezEvent> eventsArg) t
+       int fromPreRoutedEventId = task.getNextPreRoutedEventId();
+       int maxEvents = Math.min(maxEventsToGet, task.getMaxEventsToHandle());
+       TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, 
events, fromPreRoutedEventId,
+-          containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents);
++          containerIdStr, task.getTaskAttemptID(), fromEventId, maxEvents, 0);
+       if (LOG.isDebugEnabled()) {
+         LOG.debug("Sending heartbeat to AM, request=" + request);
+       }
+diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+index 7f436e23264b..66f7c330f786 100644
+--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
++++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+@@ -327,7 +327,7 @@ private String constructThreadNameSuffix(TezTaskAttemptID 
taskAttemptId) {
+     StringBuilder sb = new StringBuilder();
+     TezTaskID taskId = taskAttemptId.getTaskID();
+     TezVertexID vertexId = taskId.getVertexID();
+-    TezDAGID dagId = vertexId.getDAGId();
++    TezDAGID dagId = vertexId.getDAGID();
+     ApplicationId appId = dagId.getApplicationId();
+     long clusterTs = appId.getClusterTimestamp();
+     long clusterTsShort = clusterTs % 1_000_000L;
+diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+index c7b986469f4a..cc5019a64d84 100644
+--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
++++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+@@ -129,7 +129,7 @@ private IndexInformation readIndexFileToCache(Path 
indexFileName,
+     LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
+     TezSpillRecord tmp = null;
+     try {
+-      tmp = new TezSpillRecord(indexFileName, conf, expectedIndexOwner);
++      tmp = new TezSpillRecord(indexFileName, fs, expectedIndexOwner);
+     } catch (Throwable e) {
+       tmp = new TezSpillRecord(0);
+       cache.remove(mapId);
+diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+index 5d4ce223d9e9..5eebe10ac9a3 100644
+--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
++++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+@@ -322,7 +322,7 @@ public <T> void startUpdateGuaranteed(TezTaskAttemptID 
attemptId, NodeInfo assig
+     UpdateFragmentRequestProto request = 
UpdateFragmentRequestProto.newBuilder()
+         
.setIsGuaranteed(newState).setFragmentIdentifierString(attemptId.toString())
+         .setQueryIdentifier(constructQueryIdentifierProto(
+-            attemptId.getTaskID().getVertexID().getDAGId().getId())).build();
++            attemptId.getDAGID().getId())).build();
+ 
+     communicator.sendUpdateFragment(request, nodeId.getHostname(), 
nodeId.getPort(),
+         new 
LlapProtocolClientProxy.ExecuteRequestCallback<UpdateFragmentResponseProto>() {
+@@ -349,7 +349,7 @@ public void registerRunningTaskAttempt(final ContainerId 
containerId, final Task
+                                          int priority)  {
+     super.registerRunningTaskAttempt(containerId, taskSpec, 
additionalResources, credentials,
+         credentialsChanged, priority);
+-    int dagId = 
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId();
++    int dagId = taskSpec.getTaskAttemptID().getDAGID().getId();
+     if (currentQueryIdentifierProto == null || (dagId != 
currentQueryIdentifierProto.getDagIndex())) {
+       // TODO HiveQueryId extraction by parsing the Processor payload is 
ugly. This can be improved
+       // once TEZ-2672 is fixed.
+@@ -505,7 +505,7 @@ private void sendTaskTerminated(final TezTaskAttemptID 
taskAttemptId,
+       TerminateFragmentRequestProto request =
+           TerminateFragmentRequestProto.newBuilder().setQueryIdentifier(
+               constructQueryIdentifierProto(
+-                  taskAttemptId.getTaskID().getVertexID().getDAGId().getId()))
++                  taskAttemptId.getDAGID().getId()))
+               .setFragmentIdentifierString(taskAttemptId.toString()).build();
+       communicator.sendTerminateFragment(request, nodeId.getHostname(), 
nodeId.getPort(),
+           new 
LlapProtocolClientProxy.ExecuteRequestCallback<TerminateFragmentResponseProto>()
 {
+@@ -649,7 +649,7 @@ private String constructLogUrl(final TezTaskAttemptID 
attemptID, final NodeId co
+ 
+   private String constructLlapLogUrl(final TezTaskAttemptID attemptID, final 
String containerIdString,
+     final boolean isDone, final String nmAddress) {
+-    String dagId = attemptID.getTaskID().getVertexID().getDAGId().toString();
++    String dagId = attemptID.getDAGID().toString();
+     String filename = JOINER.join(currentHiveQueryId, "-", dagId, ".log", 
(isDone ? ".done" : ""),
+       "?nm.id=", nmAddress);
+     String url = PATH_JOINER.join(timelineServerUri, "ws", "v1", 
"applicationhistory", "containers",
+@@ -794,7 +794,7 @@ private SubmitWorkRequestProto 
constructSubmitWorkRequest(ContainerId containerI
+     builder.setAmPort(getAddress().getPort());
+ 
+     Preconditions.checkState(currentQueryIdentifierProto.getDagIndex() ==
+-        
taskSpec.getTaskAttemptID().getTaskID().getVertexID().getDAGId().getId());
++        taskSpec.getTaskAttemptID().getDAGID().getId());
+     ByteBuffer credentialsBinary = 
credentialMap.get(currentQueryIdentifierProto);
+     if (credentialsBinary == null) {
+       credentialsBinary = 
serializeCredentials(getContext().getCurrentDagInfo().getCredentials());
+diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+index 82179645da00..99038cd49542 100644
+--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
++++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+@@ -1075,7 +1075,7 @@ public void allocateTask(Object task, Resource 
capability, String[] hosts, Strin
+     writeLock.lock();
+     try {
+       if (!dagRunning && metrics != null && id != null) {
+-        metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
++        metrics.setDagId(id.getDAGID().toString());
+       }
+       dagRunning = true;
+       dagStats.registerTaskRequest(hosts, racks);
+@@ -1099,7 +1099,7 @@ public void allocateTask(Object task, Resource 
capability, ContainerId container
+     writeLock.lock();
+     try {
+       if (!dagRunning && metrics != null && id != null) {
+-        metrics.setDagId(id.getTaskID().getVertexID().getDAGId().toString());
++        metrics.setDagId(id.getDAGID().toString());
+       }
+       dagRunning = true;
+       dagStats.registerTaskRequest(null, null);
+@@ -1114,7 +1114,7 @@ public void allocateTask(Object task, Resource 
capability, ContainerId container
+   protected TezTaskAttemptID getTaskAttemptId(Object task) {
+     // TODO: why does Tez API use "Object" for this?
+     if (task instanceof TaskAttempt) {
+-      return ((TaskAttempt)task).getID();
++      return ((TaskAttempt)task).getTaskAttemptID();
+     }
+     throw new AssertionError("LLAP plugin can only schedule task attempts");
+   }
+@@ -2030,7 +2030,7 @@ private List<TaskInfo> 
preemptTasksFromMap(TreeMap<Integer, TreeSet<TaskInfo>> r
+             continue; // Not the right host.
+           }
+           Map<Integer,Set<Integer>> depInfo = getDependencyInfo(
+-              taskInfo.attemptId.getTaskID().getVertexID().getDAGId());
++              taskInfo.attemptId.getDAGID());
+           Set<Integer> vertexDepInfo = null;
+           if (depInfo != null) {
+             vertexDepInfo = depInfo.get(forVertex);
+diff --git 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+index 5efe7c677ce6..2fa2487a74d7 100644
+--- 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
++++ 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskCommunicator.java
+@@ -364,6 +364,7 @@ private TaskSpec createBaseTaskSpec(String vertexName, 
TezVertexID vertexId, int
+       TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(
+           TezTaskID.getInstance(vertexId, taskIdx), 0);
+       doReturn(taskAttemptId).when(taskSpec).getTaskAttemptID();
++      doReturn(taskAttemptId.getDAGID()).when(taskSpec).getDAGID();
+       doReturn(DAG_NAME).when(taskSpec).getDAGName();
+       doReturn(vertexName).when(taskSpec).getVertexName();
+       ProcessorDescriptor processorDescriptor = 
ProcessorDescriptor.create("fakeClassName").setUserPayload(userPayload);
+diff --git a/pom.xml b/pom.xml
+index cb54806ef5ca..053ccc059f3f 100644
+--- a/pom.xml
++++ b/pom.xml
+@@ -196,7 +196,7 @@
+     <slf4j.version>1.7.10</slf4j.version>
+     <ST4.version>4.0.4</ST4.version>
+     <storage-api.version>2.7.0</storage-api.version>
+-    <tez.version>0.9.1</tez.version>
++    <tez.version>0.10.2</tez.version>
+     <super-csv.version>2.2.0</super-csv.version>
+     <spark.version>2.3.0</spark.version>
+     <scala.binary.version>2.11</scala.binary.version>
+diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+index a15482f19c43..288341a2b229 100644
+--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
++++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+@@ -761,5 +761,12 @@ public DAGStatus 
waitForCompletionWithStatusUpdates(@Nullable Set<StatusGetOpts>
+         return dagClient.waitForCompletionWithStatusUpdates(statusGetOpts);
+       }
+     }
++
++    @Override
++    public String getWebUIAddress() throws IOException, TezException {
++      synchronized (dagClient) {
++        return dagClient.getWebUIAddress();
++      }
++    }
+   }
+ }

Reply via email to