This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6c6bddcef3e [HUDI-7626] Propagate UserGroupInformation from the main
thread to the new thread of timeline service threadpool (#11039)
6c6bddcef3e is described below
commit 6c6bddcef3ec383b08eb10f10ab0400f4edc41f4
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Apr 17 16:40:29 2024 +0800
[HUDI-7626] Propagate UserGroupInformation from the main thread to the new
thread of timeline service threadpool (#11039)
---
.../hudi/timeline/service/RequestHandler.java | 128 +++++++++++----------
1 file changed, 70 insertions(+), 58 deletions(-)
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 8ca50462cd2..1cc45ea23d4 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -54,11 +54,13 @@ import io.javalin.http.Context;
import io.javalin.http.Handler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.security.UserGroupInformation;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -593,76 +595,86 @@ public class RequestHandler {
private final Handler handler;
private final boolean performRefreshCheck;
+ private final UserGroupInformation ugi;
ViewHandler(Handler handler, boolean performRefreshCheck) {
this.handler = handler;
this.performRefreshCheck = performRefreshCheck;
+ try {
+ ugi = UserGroupInformation.getCurrentUser();
+ } catch (Exception e) {
+ LOG.warn("Fail to get ugi", e);
+ throw new HoodieException(e);
+ }
}
@Override
public void handle(@NotNull Context context) throws Exception {
- boolean success = true;
- long beginTs = System.currentTimeMillis();
- boolean synced = false;
- boolean refreshCheck = performRefreshCheck &&
!isRefreshCheckDisabledInQuery(context);
- long refreshCheckTimeTaken = 0;
- long handleTimeTaken = 0;
- long finalCheckTimeTaken = 0;
- try {
- if (refreshCheck) {
- long beginRefreshCheck = System.currentTimeMillis();
- synced = syncIfLocalViewBehind(context);
- long endRefreshCheck = System.currentTimeMillis();
- refreshCheckTimeTaken = endRefreshCheck - beginRefreshCheck;
- }
+ ugi.doAs((PrivilegedExceptionAction<Void>) () -> {
+ boolean success = true;
+ long beginTs = System.currentTimeMillis();
+ boolean synced = false;
+ boolean refreshCheck = performRefreshCheck &&
!isRefreshCheckDisabledInQuery(context);
+ long refreshCheckTimeTaken = 0;
+ long handleTimeTaken = 0;
+ long finalCheckTimeTaken = 0;
+ try {
+ if (refreshCheck) {
+ long beginRefreshCheck = System.currentTimeMillis();
+ synced = syncIfLocalViewBehind(context);
+ long endRefreshCheck = System.currentTimeMillis();
+ refreshCheckTimeTaken = endRefreshCheck - beginRefreshCheck;
+ }
- long handleBeginMs = System.currentTimeMillis();
- handler.handle(context);
- long handleEndMs = System.currentTimeMillis();
- handleTimeTaken = handleEndMs - handleBeginMs;
-
- if (refreshCheck) {
- long beginFinalCheck = System.currentTimeMillis();
- if (isLocalViewBehind(context)) {
- String lastKnownInstantFromClient =
context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS,
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
- String timelineHashFromClient =
context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH,
String.class).getOrDefault("");
- HoodieTimeline localTimeline =
-
viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline();
- if (shouldThrowExceptionIfLocalViewBehind(localTimeline,
timelineHashFromClient)) {
- String errMsg =
- "Last known instant from client was "
- + lastKnownInstantFromClient
- + " but server has the following timeline "
- + localTimeline.getInstants();
- throw new BadRequestResponse(errMsg);
+ long handleBeginMs = System.currentTimeMillis();
+ handler.handle(context);
+ long handleEndMs = System.currentTimeMillis();
+ handleTimeTaken = handleEndMs - handleBeginMs;
+
+ if (refreshCheck) {
+ long beginFinalCheck = System.currentTimeMillis();
+ if (isLocalViewBehind(context)) {
+ String lastKnownInstantFromClient =
context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS,
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
+ String timelineHashFromClient =
context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH,
String.class).getOrDefault("");
+ HoodieTimeline localTimeline =
+
viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline();
+ if (shouldThrowExceptionIfLocalViewBehind(localTimeline,
timelineHashFromClient)) {
+ String errMsg =
+ "Last known instant from client was "
+ + lastKnownInstantFromClient
+ + " but server has the following timeline "
+ + localTimeline.getInstants();
+ throw new BadRequestResponse(errMsg);
+ }
}
+ long endFinalCheck = System.currentTimeMillis();
+ finalCheckTimeTaken = endFinalCheck - beginFinalCheck;
}
- long endFinalCheck = System.currentTimeMillis();
- finalCheckTimeTaken = endFinalCheck - beginFinalCheck;
- }
- } catch (RuntimeException re) {
- success = false;
- if (re instanceof BadRequestResponse) {
- LOG.warn("Bad request response due to client view behind server
view. " + re.getMessage());
- } else {
- LOG.error("Got runtime exception servicing request " +
context.queryString(), re);
+ } catch (RuntimeException re) {
+ success = false;
+ if (re instanceof BadRequestResponse) {
+ LOG.warn("Bad request response due to client view behind server
view. " + re.getMessage());
+ } else {
+ LOG.error("Got runtime exception servicing request " +
context.queryString(), re);
+ }
+ throw re;
+ } finally {
+ long endTs = System.currentTimeMillis();
+ long timeTakenMillis = endTs - beginTs;
+ metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis);
+ metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken);
+ metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken);
+ metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
+ metricsRegistry.add("TOTAL_API_CALLS", 1);
+
+ LOG.debug(String.format(
+ "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
+ + "Success=%s, Query=%s, Host=%s, synced=%s",
+ timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken,
finalCheckTimeTaken, success,
+ context.queryString(), context.host(), synced));
}
- throw re;
- } finally {
- long endTs = System.currentTimeMillis();
- long timeTakenMillis = endTs - beginTs;
- metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis);
- metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken);
- metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken);
- metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
- metricsRegistry.add("TOTAL_API_CALLS", 1);
-
- LOG.debug(String.format(
- "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
- + "Success=%s, Query=%s, Host=%s, synced=%s",
- timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken,
finalCheckTimeTaken, success,
- context.queryString(), context.host(), synced));
- }
+ return null;
+ });
}
}
}