This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f11a6c7  [HUDI-1553] Configuration and metrics for the 
TimelineService. (#2495)
f11a6c7 is described below

commit f11a6c7b2d4ef045419a4522e8e203f51292b816
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Mar 2 21:58:41 2021 -0800

    [HUDI-1553] Configuration and metrics for the TimelineService. (#2495)
---
 .../embedded/EmbeddedTimelineServerHelper.java     |  4 +-
 .../client/embedded/EmbeddedTimelineService.java   | 11 ++-
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 22 +++++-
 .../hudi/timeline/service/RequestHandler.java      | 85 +++++++++++++++++++++-
 .../hudi/timeline/service/TimelineService.java     | 40 ++++++++--
 5 files changed, 147 insertions(+), 15 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
index fa74aa3..e5a719e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
@@ -50,7 +50,9 @@ public class EmbeddedTimelineServerHelper {
       LOG.info("Starting Timeline service !!");
       Option<String> hostAddr = 
context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
       timelineServer = Option.of(new EmbeddedTimelineService(context, 
hostAddr.orElse(null), config.getEmbeddedTimelineServerPort(),
-          config.getMetadataConfig(), 
config.getClientSpecifiedViewStorageConfig(), config.getBasePath()));
+          config.getMetadataConfig(), 
config.getClientSpecifiedViewStorageConfig(), config.getBasePath(),
+          config.getEmbeddedTimelineServerThreads(), 
config.getEmbeddedTimelineServerCompressOutput(),
+          config.getEmbeddedTimelineServerUseAsync()));
       timelineServer.get().startServer();
       updateWriteConfigWithTimelineServer(timelineServer.get(), config);
     }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 386f7d5..a2bc711 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -48,11 +48,15 @@ public class EmbeddedTimelineService {
   private final HoodieMetadataConfig metadataConfig;
   private final String basePath;
 
+  private final int numThreads;
+  private final boolean shouldCompressOutput;
+  private final boolean useAsync;
   private transient FileSystemViewManager viewManager;
   private transient TimelineService server;
 
   public EmbeddedTimelineService(HoodieEngineContext context, String 
embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
-                                 HoodieMetadataConfig metadataConfig, 
FileSystemViewStorageConfig config, String basePath) {
+                                 HoodieMetadataConfig metadataConfig, 
FileSystemViewStorageConfig config, String basePath,
+                                 int numThreads, boolean compressOutput, 
boolean useAsync) {
     setHostAddr(embeddedTimelineServiceHostAddr);
     this.context = context;
     this.config = config;
@@ -61,6 +65,9 @@ public class EmbeddedTimelineService {
     this.hadoopConf = context.getHadoopConf();
     this.viewManager = createViewManager();
     this.preferredPort = embeddedTimelineServerPort;
+    this.numThreads = numThreads;
+    this.shouldCompressOutput = compressOutput;
+    this.useAsync = useAsync;
   }
 
   private FileSystemViewManager createViewManager() {
@@ -77,7 +84,7 @@ public class EmbeddedTimelineService {
   }
 
   public void startServer() throws IOException {
-    server = new TimelineService(preferredPort, viewManager, 
hadoopConf.newCopy());
+    server = new TimelineService(preferredPort, viewManager, 
hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
     serverPort = server.startService();
     LOG.info("Started embedded timeline server at " + hostAddr + ":" + 
serverPort);
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index ee0e3fa..4e493e4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -113,6 +113,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
   public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
   public static final String EMBEDDED_TIMELINE_SERVER_PORT = 
"hoodie.embed.timeline.server.port";
   public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT = "0";
+  public static final String EMBEDDED_TIMELINE_SERVER_THREADS = 
"hoodie.embed.timeline.server.threads";
+  public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_THREADS = "-1";
+  public static final String EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT = 
"hoodie.embed.timeline.server.gzip";
+  public static final String DEFAULT_EMBEDDED_TIMELINE_COMPRESS_OUTPUT = 
"true";
+  public static final String EMBEDDED_TIMELINE_SERVER_USE_ASYNC = 
"hoodie.embed.timeline.server.async";
+  public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ASYNC = "false";
 
   public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = 
"hoodie.fail.on.timeline.archiving";
   public static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = 
"true";
@@ -317,6 +323,18 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_PORT, 
DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT));
   }
 
+  public int getEmbeddedTimelineServerThreads() {
+    return 
Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_THREADS, 
DEFAULT_EMBEDDED_TIMELINE_SERVER_THREADS));
+  }
+
+  public boolean getEmbeddedTimelineServerCompressOutput() {
+    return 
Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT,
 DEFAULT_EMBEDDED_TIMELINE_COMPRESS_OUTPUT));
+  }
+
+  public boolean getEmbeddedTimelineServerUseAsync() {
+    return 
Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_USE_ASYNC, 
DEFAULT_EMBEDDED_TIMELINE_SERVER_ASYNC));
+  }
+
   public boolean isFailOnTimelineArchivingEnabled() {
     return 
Boolean.parseBoolean(props.getProperty(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP));
   }
@@ -497,7 +515,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   public long getClusteringMaxBytesInGroup() {
     return 
Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_MAX_BYTES_PER_GROUP));
   }
-  
+
   public long getClusteringSmallFileLimit() {
     return 
Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_PLAN_SMALL_FILE_LIMIT));
   }
@@ -513,7 +531,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   public int getTargetPartitionsForClustering() {
     return 
Integer.parseInt(props.getProperty(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS));
   }
-  
+
   public String getClusteringSortColumns() {
     return 
props.getProperty(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY);
   }
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 22d3082..09ebeb5 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.timeline.service;
 
+import org.apache.hudi.common.metrics.Registry;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
 import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
@@ -29,7 +30,9 @@ import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
 import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
 import org.apache.hudi.timeline.service.handlers.TimelineHandler;
@@ -47,6 +50,9 @@ import org.jetbrains.annotations.NotNull;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
 /**
@@ -62,13 +68,24 @@ public class RequestHandler {
   private final TimelineHandler instantHandler;
   private final FileSliceHandler sliceHandler;
   private final BaseFileHandler dataFileHandler;
+  private Registry metricsRegistry = Registry.getRegistry("TimelineService");
+  private ScheduledExecutorService asyncResultService = 
Executors.newSingleThreadScheduledExecutor();
+  private final boolean useAsync;
 
-  public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager 
viewManager) throws IOException {
+  public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager 
viewManager, boolean useAsync) throws IOException {
     this.viewManager = viewManager;
     this.app = app;
     this.instantHandler = new TimelineHandler(conf, viewManager);
     this.sliceHandler = new FileSliceHandler(conf, viewManager);
     this.dataFileHandler = new BaseFileHandler(conf, viewManager);
+    this.useAsync = useAsync;
+    if (useAsync) {
+      asyncResultService = Executors.newSingleThreadScheduledExecutor();
+    }
+  }
+
+  public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager 
viewManager) throws IOException {
+    this(app, conf, viewManager, false);
   }
 
   public void register() {
@@ -130,13 +147,44 @@ public class RequestHandler {
   }
 
   private void writeValueAsString(Context ctx, Object obj) throws 
JsonProcessingException {
+    if (useAsync) {
+      writeValueAsStringAsync(ctx, obj);
+    } else {
+      writeValueAsStringSync(ctx, obj);
+    }
+  }
+
+  private void writeValueAsStringSync(Context ctx, Object obj) throws 
JsonProcessingException {
+    HoodieTimer timer = new HoodieTimer().startTimer();
     boolean prettyPrint = ctx.queryParam("pretty") != null;
-    long beginJsonTs = System.currentTimeMillis();
     String result =
         prettyPrint ? 
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : 
OBJECT_MAPPER.writeValueAsString(obj);
-    long endJsonTs = System.currentTimeMillis();
-    LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs));
+    final long jsonifyTime = timer.endTimer();
     ctx.result(result);
+    metricsRegistry.add("WRITE_VALUE_CNT", 1);
+    metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Jsonify TimeTaken=" + jsonifyTime);
+    }
+  }
+
+  private void writeValueAsStringAsync(Context ctx, Object obj) throws 
JsonProcessingException {
+    ctx.result(CompletableFuture.supplyAsync(() -> {
+      HoodieTimer timer = new HoodieTimer().startTimer();
+      boolean prettyPrint = ctx.queryParam("pretty") != null;
+      try {
+        String result = prettyPrint ? 
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) : 
OBJECT_MAPPER.writeValueAsString(obj);
+        final long jsonifyTime = timer.endTimer();
+        metricsRegistry.add("WRITE_VALUE_CNT", 1);
+        metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Jsonify TimeTaken=" + jsonifyTime);
+        }
+        return result;
+      } catch (JsonProcessingException e) {
+        throw new HoodieException("Failed to JSON encode the value", e);
+      }
+    }, asyncResultService));
   }
 
   /**
@@ -144,12 +192,14 @@ public class RequestHandler {
    */
   private void registerTimelineAPI() {
     app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT, new ViewHandler(ctx 
-> {
+      metricsRegistry.add("LAST_INSTANT", 1);
       List<InstantDTO> dtos = instantHandler
           
.getLastInstant(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue());
       writeValueAsString(ctx, dtos);
     }, false));
 
     app.get(RemoteHoodieTableFileSystemView.TIMELINE, new ViewHandler(ctx -> {
+      metricsRegistry.add("TIMELINE", 1);
       TimelineDTO dto = instantHandler
           
.getTimeline(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue());
       writeValueAsString(ctx, dto);
@@ -161,6 +211,7 @@ public class RequestHandler {
    */
   private void registerDataFilesAPI() {
     app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILES_URL, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_PARTITION_DATA_FILES", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -168,6 +219,7 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILE_URL, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_PARTITION_DATA_FILE", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFile(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -176,12 +228,14 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_ALL_DATA_FILES", 1);
       List<BaseFileDTO> dtos = dataFileHandler
           
.getLatestDataFiles(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL,
 new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_DATA_FILES_BEFORE_ON_INSTANT", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesBeforeOrOn(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -190,6 +244,7 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFileOn(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -199,6 +254,7 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_DATA_FILES", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getAllDataFiles(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -206,6 +262,7 @@ public class RequestHandler {
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_RANGE_INSTANT_URL, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_DATA_FILES_RANGE_INSTANT", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesInRange(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
 Arrays
               
.asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(",")));
@@ -218,6 +275,7 @@ public class RequestHandler {
    */
   private void registerFileSlicesAPI() {
     app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_URL, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_PARTITION_SLICES", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlices(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -225,6 +283,7 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_PARTITION_SLICE", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlice(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -233,6 +292,7 @@ public class RequestHandler {
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_UNCOMPACTED_SLICES_URL,
 new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_PARTITION_UNCOMPACTED_SLICES", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestUnCompactedFileSlices(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -240,6 +300,7 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.ALL_SLICES_URL, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_SLICES", 1);
       List<FileSliceDTO> dtos = sliceHandler.getAllFileSlices(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -247,6 +308,7 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_RANGE_INSTANT_URL, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_SLICE_RANGE_INSTANT", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSliceInRange(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
 Arrays
               
.asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(",")));
@@ -254,6 +316,7 @@ public class RequestHandler {
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL,
 new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_SLICES_MERGED_BEFORE_ON_INSTANT", 1);
       List<FileSliceDTO> dtos = 
sliceHandler.getLatestMergedFileSlicesBeforeOrOn(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -262,6 +325,7 @@ public class RequestHandler {
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_BEFORE_ON_INSTANT_URL, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("LATEST_SLICES_BEFORE_ON_INSTANT", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesBeforeOrOn(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -273,12 +337,14 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("PEDING_COMPACTION_OPS", 1);
       List<CompactionOpDTO> dtos = sliceHandler.getPendingCompactionOperations(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
       writeValueAsString(ctx, dtos);
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_URL, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION", 1);
       List<FileGroupDTO> dtos = sliceHandler.getAllFileGroups(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -286,12 +352,14 @@ public class RequestHandler {
     }, true));
 
     app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("REFRESH_TABLE", 1);
       boolean success = sliceHandler
           
.refreshTable(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
       writeValueAsString(ctx, success);
     }, false));
 
     
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON", 1);
       List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBeforeOrOn(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""),
@@ -300,6 +368,7 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE", 1);
       List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""),
@@ -308,6 +377,7 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION, 
new ViewHandler(ctx -> {
+      metricsRegistry.add("ALL_REPLACED_FILEGROUPS_PARTITION", 1);
       List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
           ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -315,6 +385,7 @@ public class RequestHandler {
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new 
ViewHandler(ctx -> {
+      metricsRegistry.add("PENDING_CLUSTERING_FILEGROUPS", 1);
       List<ClusteringOpDTO> dtos = 
sliceHandler.getFileGroupsInPendingClustering(
           
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
       writeValueAsString(ctx, dtos);
@@ -380,6 +451,12 @@ public class RequestHandler {
       } finally {
         long endTs = System.currentTimeMillis();
         long timeTakenMillis = endTs - beginTs;
+        metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis);
+        metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken);
+        metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken);
+        metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
+        metricsRegistry.add("TOTAL_API_CALLS", 1);
+
         LOG.info(String.format(
                 "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
                     + "Success=%s, Query=%s, Host=%s, synced=%s",
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index 47db1fd..3cae889 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -29,10 +29,14 @@ import 
org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import io.javalin.Javalin;
+import io.javalin.core.util.JettyServerUtil;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -44,32 +48,40 @@ public class TimelineService {
 
   private static final Logger LOG = 
LogManager.getLogger(TimelineService.class);
   private static final int START_SERVICE_MAX_RETRIES = 16;
+  private static final int DEFAULT_NUM_THREADS = -1;
 
   private int serverPort;
   private Configuration conf;
   private transient FileSystem fs;
   private transient Javalin app = null;
   private transient FileSystemViewManager fsViewsManager;
+  private final int numThreads;
+  private final boolean shouldCompressOutput;
+  private final boolean useAsync;
 
   public int getServerPort() {
     return serverPort;
   }
 
-  public TimelineService(int serverPort, FileSystemViewManager 
globalFileSystemViewManager, Configuration conf)
-      throws IOException {
+  public TimelineService(int serverPort, FileSystemViewManager 
globalFileSystemViewManager, Configuration conf,
+      int numThreads, boolean compressOutput, boolean useAsync) throws 
IOException {
     this.conf = FSUtils.prepareHadoopConf(conf);
     this.fs = FileSystem.get(conf);
     this.serverPort = serverPort;
     this.fsViewsManager = globalFileSystemViewManager;
+    this.numThreads = numThreads;
+    this.shouldCompressOutput = compressOutput;
+    this.useAsync = useAsync;
   }
 
   public TimelineService(int serverPort, FileSystemViewManager 
globalFileSystemViewManager) throws IOException {
-    this(serverPort, globalFileSystemViewManager, new Configuration());
+    this(serverPort, globalFileSystemViewManager, new Configuration(), 
DEFAULT_NUM_THREADS, true, false);
   }
 
   public TimelineService(Config config) throws IOException {
     this(config.serverPort, buildFileSystemViewManager(config,
-        new SerializableConfiguration(FSUtils.prepareHadoopConf(new 
Configuration()))));
+        new SerializableConfiguration(FSUtils.prepareHadoopConf(new 
Configuration()))), new Configuration(),
+        config.numThreads, config.compress, config.async);
   }
 
   public static class Config implements Serializable {
@@ -97,6 +109,15 @@ public class TimelineService {
     @Parameter(names = {"--rocksdb-path", "-rp"}, description = "Root 
directory for RocksDB")
     public String rocksDBPath = 
FileSystemViewStorageConfig.DEFAULT_ROCKSDB_BASE_PATH;
 
+    @Parameter(names = {"--threads", "-t"}, description = "Number of threads 
to use for serving requests")
+    public int numThreads = DEFAULT_NUM_THREADS;
+
+    @Parameter(names = {"--async"}, description = "Use asyncronous request 
processing")
+    public boolean async = false;
+
+    @Parameter(names = {"--compress"}, description = "Compress output using 
gzip")
+    public boolean compress = true;
+
     @Parameter(names = {"--help", "-h"})
     public Boolean help = false;
   }
@@ -129,8 +150,15 @@ public class TimelineService {
   }
 
   public int startService() throws IOException {
-    app = Javalin.create();
-    RequestHandler requestHandler = new RequestHandler(app, conf, 
fsViewsManager);
+    final Server server = numThreads == DEFAULT_NUM_THREADS ? 
JettyServerUtil.defaultServer()
+            : new Server(new QueuedThreadPool(numThreads));
+
+    app = Javalin.create().server(() -> server);
+    if (!shouldCompressOutput) {
+      app.disableDynamicGzip();
+    }
+
+    RequestHandler requestHandler = new RequestHandler(app, conf, 
fsViewsManager, useAsync);
     app.get("/", ctx -> ctx.result("Hello World"));
     requestHandler.register();
     int realServerPort = startServiceOnPort(serverPort);

Reply via email to