This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c6bddcef3e [HUDI-7626] Propagate UserGroupInformation from the main 
thread to the new thread of timeline service threadpool (#11039)
6c6bddcef3e is described below

commit 6c6bddcef3ec383b08eb10f10ab0400f4edc41f4
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Apr 17 16:40:29 2024 +0800

    [HUDI-7626] Propagate UserGroupInformation from the main thread to the new 
thread of timeline service threadpool (#11039)
---
 .../hudi/timeline/service/RequestHandler.java      | 128 +++++++++++----------
 1 file changed, 70 insertions(+), 58 deletions(-)

diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 8ca50462cd2..1cc45ea23d4 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -54,11 +54,13 @@ import io.javalin.http.Context;
 import io.javalin.http.Handler;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -593,76 +595,86 @@ public class RequestHandler {
 
     private final Handler handler;
     private final boolean performRefreshCheck;
+    private final UserGroupInformation ugi;
 
     ViewHandler(Handler handler, boolean performRefreshCheck) {
       this.handler = handler;
       this.performRefreshCheck = performRefreshCheck;
+      try {
+        ugi = UserGroupInformation.getCurrentUser();
+      } catch (Exception e) {
+        LOG.warn("Fail to get ugi", e);
+        throw new HoodieException(e);
+      }
     }
 
     @Override
     public void handle(@NotNull Context context) throws Exception {
-      boolean success = true;
-      long beginTs = System.currentTimeMillis();
-      boolean synced = false;
-      boolean refreshCheck = performRefreshCheck && 
!isRefreshCheckDisabledInQuery(context);
-      long refreshCheckTimeTaken = 0;
-      long handleTimeTaken = 0;
-      long finalCheckTimeTaken = 0;
-      try {
-        if (refreshCheck) {
-          long beginRefreshCheck = System.currentTimeMillis();
-          synced = syncIfLocalViewBehind(context);
-          long endRefreshCheck = System.currentTimeMillis();
-          refreshCheckTimeTaken = endRefreshCheck - beginRefreshCheck;
-        }
+      ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+        boolean success = true;
+        long beginTs = System.currentTimeMillis();
+        boolean synced = false;
+        boolean refreshCheck = performRefreshCheck && 
!isRefreshCheckDisabledInQuery(context);
+        long refreshCheckTimeTaken = 0;
+        long handleTimeTaken = 0;
+        long finalCheckTimeTaken = 0;
+        try {
+          if (refreshCheck) {
+            long beginRefreshCheck = System.currentTimeMillis();
+            synced = syncIfLocalViewBehind(context);
+            long endRefreshCheck = System.currentTimeMillis();
+            refreshCheckTimeTaken = endRefreshCheck - beginRefreshCheck;
+          }
 
-        long handleBeginMs = System.currentTimeMillis();
-        handler.handle(context);
-        long handleEndMs = System.currentTimeMillis();
-        handleTimeTaken = handleEndMs - handleBeginMs;
-
-        if (refreshCheck) {
-          long beginFinalCheck = System.currentTimeMillis();
-          if (isLocalViewBehind(context)) {
-            String lastKnownInstantFromClient = 
context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, 
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
-            String timelineHashFromClient = 
context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, 
String.class).getOrDefault("");
-            HoodieTimeline localTimeline =
-                
viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline();
-            if (shouldThrowExceptionIfLocalViewBehind(localTimeline, 
timelineHashFromClient)) {
-              String errMsg =
-                  "Last known instant from client was "
-                      + lastKnownInstantFromClient
-                      + " but server has the following timeline "
-                      + localTimeline.getInstants();
-              throw new BadRequestResponse(errMsg);
+          long handleBeginMs = System.currentTimeMillis();
+          handler.handle(context);
+          long handleEndMs = System.currentTimeMillis();
+          handleTimeTaken = handleEndMs - handleBeginMs;
+
+          if (refreshCheck) {
+            long beginFinalCheck = System.currentTimeMillis();
+            if (isLocalViewBehind(context)) {
+              String lastKnownInstantFromClient = 
context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, 
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
+              String timelineHashFromClient = 
context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, 
String.class).getOrDefault("");
+              HoodieTimeline localTimeline =
+                  
viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline();
+              if (shouldThrowExceptionIfLocalViewBehind(localTimeline, 
timelineHashFromClient)) {
+                String errMsg =
+                    "Last known instant from client was "
+                        + lastKnownInstantFromClient
+                        + " but server has the following timeline "
+                        + localTimeline.getInstants();
+                throw new BadRequestResponse(errMsg);
+              }
             }
+            long endFinalCheck = System.currentTimeMillis();
+            finalCheckTimeTaken = endFinalCheck - beginFinalCheck;
           }
-          long endFinalCheck = System.currentTimeMillis();
-          finalCheckTimeTaken = endFinalCheck - beginFinalCheck;
-        }
-      } catch (RuntimeException re) {
-        success = false;
-        if (re instanceof BadRequestResponse) {
-          LOG.warn("Bad request response due to client view behind server 
view. " + re.getMessage());
-        } else {
-          LOG.error("Got runtime exception servicing request " + 
context.queryString(), re);
+        } catch (RuntimeException re) {
+          success = false;
+          if (re instanceof BadRequestResponse) {
+            LOG.warn("Bad request response due to client view behind server 
view. " + re.getMessage());
+          } else {
+            LOG.error("Got runtime exception servicing request " + 
context.queryString(), re);
+          }
+          throw re;
+        } finally {
+          long endTs = System.currentTimeMillis();
+          long timeTakenMillis = endTs - beginTs;
+          metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis);
+          metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken);
+          metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken);
+          metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
+          metricsRegistry.add("TOTAL_API_CALLS", 1);
+
+          LOG.debug(String.format(
+              "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
+                  + "Success=%s, Query=%s, Host=%s, synced=%s",
+              timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, 
finalCheckTimeTaken, success,
+              context.queryString(), context.host(), synced));
         }
-        throw re;
-      } finally {
-        long endTs = System.currentTimeMillis();
-        long timeTakenMillis = endTs - beginTs;
-        metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis);
-        metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken);
-        metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken);
-        metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
-        metricsRegistry.add("TOTAL_API_CALLS", 1);
-
-        LOG.debug(String.format(
-            "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
-                + "Success=%s, Query=%s, Host=%s, synced=%s",
-            timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, 
finalCheckTimeTaken, success,
-            context.queryString(), context.host(), synced));
-      }
+        return null;
+      });
     }
   }
 }

Reply via email to