This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 585078471fe30273ec5ac1552fb5575c5beea369 Author: Jing Zhang <[email protected]> AuthorDate: Wed Apr 17 16:40:29 2024 +0800 [HUDI-7626] Propagate UserGroupInformation from the main thread to the new thread of timeline service threadpool (#11039) --- .../hudi/timeline/service/RequestHandler.java | 128 +++++++++++---------- 1 file changed, 70 insertions(+), 58 deletions(-) diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 9385b4eca9e..12e11db403d 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -52,11 +52,13 @@ import io.javalin.http.Context; import io.javalin.http.Handler; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.UserGroupInformation; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -563,76 +565,86 @@ public class RequestHandler { private final Handler handler; private final boolean performRefreshCheck; + private final UserGroupInformation ugi; ViewHandler(Handler handler, boolean performRefreshCheck) { this.handler = handler; this.performRefreshCheck = performRefreshCheck; + try { + ugi = UserGroupInformation.getCurrentUser(); + } catch (Exception e) { + LOG.warn("Fail to get ugi", e); + throw new HoodieException(e); + } } @Override public void handle(@NotNull Context context) throws Exception { - boolean success = true; - long beginTs = System.currentTimeMillis(); - boolean synced = false; - boolean refreshCheck = performRefreshCheck && !isRefreshCheckDisabledInQuery(context); - long refreshCheckTimeTaken = 0; - long handleTimeTaken = 0; - long finalCheckTimeTaken = 0; - try { - if (refreshCheck) { - long beginRefreshCheck = System.currentTimeMillis(); - synced = syncIfLocalViewBehind(context); - long endRefreshCheck = System.currentTimeMillis(); - refreshCheckTimeTaken = endRefreshCheck - beginRefreshCheck; - } + ugi.doAs((PrivilegedExceptionAction<Void>) () -> { + boolean success = true; + long beginTs = System.currentTimeMillis(); + boolean synced = false; + boolean refreshCheck = performRefreshCheck && !isRefreshCheckDisabledInQuery(context); + long refreshCheckTimeTaken = 0; + long handleTimeTaken = 0; + long finalCheckTimeTaken = 0; + try { + if (refreshCheck) { + long beginRefreshCheck = System.currentTimeMillis(); + synced = syncIfLocalViewBehind(context); + long endRefreshCheck = System.currentTimeMillis(); + refreshCheckTimeTaken = endRefreshCheck - beginRefreshCheck; + } - long handleBeginMs = System.currentTimeMillis(); - handler.handle(context); - long handleEndMs = System.currentTimeMillis(); - handleTimeTaken = handleEndMs - handleBeginMs; - - if (refreshCheck) { - long beginFinalCheck = System.currentTimeMillis(); - if (isLocalViewBehind(context)) { - String lastKnownInstantFromClient = context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS); - String timelineHashFromClient = context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, String.class).getOrDefault(""); - HoodieTimeline localTimeline = - viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline(); - if (shouldThrowExceptionIfLocalViewBehind(localTimeline, timelineHashFromClient)) { - String errMsg = - "Last known instant from client was " - + lastKnownInstantFromClient - + " but server has the following timeline " - + localTimeline.getInstants(); - throw new BadRequestResponse(errMsg); + long handleBeginMs = System.currentTimeMillis(); + handler.handle(context); + long handleEndMs = System.currentTimeMillis(); + handleTimeTaken = handleEndMs - handleBeginMs; + + if (refreshCheck) { + long beginFinalCheck = System.currentTimeMillis(); + if (isLocalViewBehind(context)) { + String lastKnownInstantFromClient = context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS); + String timelineHashFromClient = context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, String.class).getOrDefault(""); + HoodieTimeline localTimeline = + viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline(); + if (shouldThrowExceptionIfLocalViewBehind(localTimeline, timelineHashFromClient)) { + String errMsg = + "Last known instant from client was " + + lastKnownInstantFromClient + + " but server has the following timeline " + + localTimeline.getInstants(); + throw new BadRequestResponse(errMsg); + } } + long endFinalCheck = System.currentTimeMillis(); + finalCheckTimeTaken = endFinalCheck - beginFinalCheck; } - long endFinalCheck = System.currentTimeMillis(); - finalCheckTimeTaken = endFinalCheck - beginFinalCheck; - } - } catch (RuntimeException re) { - success = false; - if (re instanceof BadRequestResponse) { - LOG.warn("Bad request response due to client view behind server view. " + re.getMessage()); - } else { - LOG.error("Got runtime exception servicing request " + context.queryString(), re); + } catch (RuntimeException re) { + success = false; + if (re instanceof BadRequestResponse) { + LOG.warn("Bad request response due to client view behind server view. " + re.getMessage()); + } else { + LOG.error("Got runtime exception servicing request " + context.queryString(), re); + } + throw re; + } finally { + long endTs = System.currentTimeMillis(); + long timeTakenMillis = endTs - beginTs; + metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis); + metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken); + metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken); + metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken); + metricsRegistry.add("TOTAL_API_CALLS", 1); + + LOG.debug(String.format( + "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " + + "Success=%s, Query=%s, Host=%s, synced=%s", + timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, + context.queryString(), context.host(), synced)); } - throw re; - } finally { - long endTs = System.currentTimeMillis(); - long timeTakenMillis = endTs - beginTs; - metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis); - metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken); - metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken); - metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken); - metricsRegistry.add("TOTAL_API_CALLS", 1); - - LOG.debug(String.format( - "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " - + "Success=%s, Query=%s, Host=%s, synced=%s", - timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, - context.queryString(), context.host(), synced)); - } + return null; + }); } } }
