This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f11a6c7 [HUDI-1553] Configuration and metrics for the
TimelineService. (#2495)
f11a6c7 is described below
commit f11a6c7b2d4ef045419a4522e8e203f51292b816
Author: Prashant Wason <[email protected]>
AuthorDate: Tue Mar 2 21:58:41 2021 -0800
[HUDI-1553] Configuration and metrics for the TimelineService. (#2495)
---
.../embedded/EmbeddedTimelineServerHelper.java | 4 +-
.../client/embedded/EmbeddedTimelineService.java | 11 ++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 22 +++++-
.../hudi/timeline/service/RequestHandler.java | 85 +++++++++++++++++++++-
.../hudi/timeline/service/TimelineService.java | 40 ++++++++--
5 files changed, 147 insertions(+), 15 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
index fa74aa3..e5a719e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java
@@ -50,7 +50,9 @@ public class EmbeddedTimelineServerHelper {
LOG.info("Starting Timeline service !!");
Option<String> hostAddr =
context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
timelineServer = Option.of(new EmbeddedTimelineService(context,
hostAddr.orElse(null), config.getEmbeddedTimelineServerPort(),
- config.getMetadataConfig(),
config.getClientSpecifiedViewStorageConfig(), config.getBasePath()));
+ config.getMetadataConfig(),
config.getClientSpecifiedViewStorageConfig(), config.getBasePath(),
+ config.getEmbeddedTimelineServerThreads(),
config.getEmbeddedTimelineServerCompressOutput(),
+ config.getEmbeddedTimelineServerUseAsync()));
timelineServer.get().startServer();
updateWriteConfigWithTimelineServer(timelineServer.get(), config);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
index 386f7d5..a2bc711 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java
@@ -48,11 +48,15 @@ public class EmbeddedTimelineService {
private final HoodieMetadataConfig metadataConfig;
private final String basePath;
+ private final int numThreads;
+ private final boolean shouldCompressOutput;
+ private final boolean useAsync;
private transient FileSystemViewManager viewManager;
private transient TimelineService server;
public EmbeddedTimelineService(HoodieEngineContext context, String
embeddedTimelineServiceHostAddr, int embeddedTimelineServerPort,
- HoodieMetadataConfig metadataConfig,
FileSystemViewStorageConfig config, String basePath) {
+ HoodieMetadataConfig metadataConfig,
FileSystemViewStorageConfig config, String basePath,
+ int numThreads, boolean compressOutput,
boolean useAsync) {
setHostAddr(embeddedTimelineServiceHostAddr);
this.context = context;
this.config = config;
@@ -61,6 +65,9 @@ public class EmbeddedTimelineService {
this.hadoopConf = context.getHadoopConf();
this.viewManager = createViewManager();
this.preferredPort = embeddedTimelineServerPort;
+ this.numThreads = numThreads;
+ this.shouldCompressOutput = compressOutput;
+ this.useAsync = useAsync;
}
private FileSystemViewManager createViewManager() {
@@ -77,7 +84,7 @@ public class EmbeddedTimelineService {
}
public void startServer() throws IOException {
- server = new TimelineService(preferredPort, viewManager,
hadoopConf.newCopy());
+ server = new TimelineService(preferredPort, viewManager,
hadoopConf.newCopy(), numThreads, shouldCompressOutput, useAsync);
serverPort = server.startService();
LOG.info("Started embedded timeline server at " + hostAddr + ":" +
serverPort);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index ee0e3fa..4e493e4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -113,6 +113,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = "true";
public static final String EMBEDDED_TIMELINE_SERVER_PORT =
"hoodie.embed.timeline.server.port";
public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT = "0";
+ public static final String EMBEDDED_TIMELINE_SERVER_THREADS =
"hoodie.embed.timeline.server.threads";
+ public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_THREADS = "-1";
+ public static final String EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT =
"hoodie.embed.timeline.server.gzip";
+ public static final String DEFAULT_EMBEDDED_TIMELINE_COMPRESS_OUTPUT =
"true";
+ public static final String EMBEDDED_TIMELINE_SERVER_USE_ASYNC =
"hoodie.embed.timeline.server.async";
+ public static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ASYNC = "false";
public static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP =
"hoodie.fail.on.timeline.archiving";
public static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED =
"true";
@@ -317,6 +323,18 @@ public class HoodieWriteConfig extends DefaultHoodieConfig
{
return Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_PORT,
DEFAULT_EMBEDDED_TIMELINE_SERVER_PORT));
}
+ public int getEmbeddedTimelineServerThreads() {
+ return
Integer.parseInt(props.getProperty(EMBEDDED_TIMELINE_SERVER_THREADS,
DEFAULT_EMBEDDED_TIMELINE_SERVER_THREADS));
+ }
+
+ public boolean getEmbeddedTimelineServerCompressOutput() {
+ return
Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_COMPRESS_OUTPUT,
DEFAULT_EMBEDDED_TIMELINE_COMPRESS_OUTPUT));
+ }
+
+ public boolean getEmbeddedTimelineServerUseAsync() {
+ return
Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_USE_ASYNC,
DEFAULT_EMBEDDED_TIMELINE_SERVER_ASYNC));
+ }
+
public boolean isFailOnTimelineArchivingEnabled() {
return
Boolean.parseBoolean(props.getProperty(FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP));
}
@@ -497,7 +515,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public long getClusteringMaxBytesInGroup() {
return
Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_MAX_BYTES_PER_GROUP));
}
-
+
public long getClusteringSmallFileLimit() {
return
Long.parseLong(props.getProperty(HoodieClusteringConfig.CLUSTERING_PLAN_SMALL_FILE_LIMIT));
}
@@ -513,7 +531,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
public int getTargetPartitionsForClustering() {
return
Integer.parseInt(props.getProperty(HoodieClusteringConfig.CLUSTERING_TARGET_PARTITIONS));
}
-
+
public String getClusteringSortColumns() {
return
props.getProperty(HoodieClusteringConfig.CLUSTERING_SORT_COLUMNS_PROPERTY);
}
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index 22d3082..09ebeb5 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -18,6 +18,7 @@
package org.apache.hudi.timeline.service;
+import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.dto.BaseFileDTO;
import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO;
@@ -29,7 +30,9 @@ import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.timeline.service.handlers.BaseFileHandler;
import org.apache.hudi.timeline.service.handlers.FileSliceHandler;
import org.apache.hudi.timeline.service.handlers.TimelineHandler;
@@ -47,6 +50,9 @@ import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
/**
@@ -62,13 +68,24 @@ public class RequestHandler {
private final TimelineHandler instantHandler;
private final FileSliceHandler sliceHandler;
private final BaseFileHandler dataFileHandler;
+ private Registry metricsRegistry = Registry.getRegistry("TimelineService");
+ private ScheduledExecutorService asyncResultService =
Executors.newSingleThreadScheduledExecutor();
+ private final boolean useAsync;
- public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager
viewManager) throws IOException {
+ public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager
viewManager, boolean useAsync) throws IOException {
this.viewManager = viewManager;
this.app = app;
this.instantHandler = new TimelineHandler(conf, viewManager);
this.sliceHandler = new FileSliceHandler(conf, viewManager);
this.dataFileHandler = new BaseFileHandler(conf, viewManager);
+ this.useAsync = useAsync;
+ if (useAsync) {
+ asyncResultService = Executors.newSingleThreadScheduledExecutor();
+ }
+ }
+
+ public RequestHandler(Javalin app, Configuration conf, FileSystemViewManager
viewManager) throws IOException {
+ this(app, conf, viewManager, false);
}
public void register() {
@@ -130,13 +147,44 @@ public class RequestHandler {
}
private void writeValueAsString(Context ctx, Object obj) throws
JsonProcessingException {
+ if (useAsync) {
+ writeValueAsStringAsync(ctx, obj);
+ } else {
+ writeValueAsStringSync(ctx, obj);
+ }
+ }
+
+ private void writeValueAsStringSync(Context ctx, Object obj) throws
JsonProcessingException {
+ HoodieTimer timer = new HoodieTimer().startTimer();
boolean prettyPrint = ctx.queryParam("pretty") != null;
- long beginJsonTs = System.currentTimeMillis();
String result =
prettyPrint ?
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) :
OBJECT_MAPPER.writeValueAsString(obj);
- long endJsonTs = System.currentTimeMillis();
- LOG.debug("Jsonify TimeTaken=" + (endJsonTs - beginJsonTs));
+ final long jsonifyTime = timer.endTimer();
ctx.result(result);
+ metricsRegistry.add("WRITE_VALUE_CNT", 1);
+ metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Jsonify TimeTaken=" + jsonifyTime);
+ }
+ }
+
+ private void writeValueAsStringAsync(Context ctx, Object obj) throws
JsonProcessingException {
+ ctx.result(CompletableFuture.supplyAsync(() -> {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ boolean prettyPrint = ctx.queryParam("pretty") != null;
+ try {
+ String result = prettyPrint ?
OBJECT_MAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(obj) :
OBJECT_MAPPER.writeValueAsString(obj);
+ final long jsonifyTime = timer.endTimer();
+ metricsRegistry.add("WRITE_VALUE_CNT", 1);
+ metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Jsonify TimeTaken=" + jsonifyTime);
+ }
+ return result;
+ } catch (JsonProcessingException e) {
+ throw new HoodieException("Failed to JSON encode the value", e);
+ }
+ }, asyncResultService));
}
/**
@@ -144,12 +192,14 @@ public class RequestHandler {
*/
private void registerTimelineAPI() {
app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT, new ViewHandler(ctx
-> {
+ metricsRegistry.add("LAST_INSTANT", 1);
List<InstantDTO> dtos = instantHandler
.getLastInstant(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue());
writeValueAsString(ctx, dtos);
}, false));
app.get(RemoteHoodieTableFileSystemView.TIMELINE, new ViewHandler(ctx -> {
+ metricsRegistry.add("TIMELINE", 1);
TimelineDTO dto = instantHandler
.getTimeline(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getValue());
writeValueAsString(ctx, dto);
@@ -161,6 +211,7 @@ public class RequestHandler {
*/
private void registerDataFilesAPI() {
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILES_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_PARTITION_DATA_FILES", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -168,6 +219,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILE_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_PARTITION_DATA_FILE", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFile(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -176,12 +228,14 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES, new
ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_ALL_DATA_FILES", 1);
List<BaseFileDTO> dtos = dataFileHandler
.getLatestDataFiles(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_DATA_FILES_BEFORE_ON_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesBeforeOrOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -190,6 +244,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFileOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -199,6 +254,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES, new
ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_DATA_FILES", 1);
List<BaseFileDTO> dtos = dataFileHandler.getAllDataFiles(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -206,6 +262,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_RANGE_INSTANT_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_DATA_FILES_RANGE_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesInRange(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
Arrays
.asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(",")));
@@ -218,6 +275,7 @@ public class RequestHandler {
*/
private void registerFileSlicesAPI() {
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_URL, new
ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_PARTITION_SLICES", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlices(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -225,6 +283,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new
ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_PARTITION_SLICE", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlice(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -233,6 +292,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_UNCOMPACTED_SLICES_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_PARTITION_UNCOMPACTED_SLICES", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestUnCompactedFileSlices(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -240,6 +300,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_SLICES_URL, new
ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_SLICES", 1);
List<FileSliceDTO> dtos = sliceHandler.getAllFileSlices(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -247,6 +308,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_RANGE_INSTANT_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_SLICE_RANGE_INSTANT", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSliceInRange(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
Arrays
.asList(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.INSTANTS_PARAM).getOrThrow().split(",")));
@@ -254,6 +316,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_SLICES_MERGED_BEFORE_ON_INSTANT", 1);
List<FileSliceDTO> dtos =
sliceHandler.getLatestMergedFileSlicesBeforeOrOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -262,6 +325,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("LATEST_SLICES_BEFORE_ON_INSTANT", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesBeforeOrOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""),
@@ -273,12 +337,14 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new
ViewHandler(ctx -> {
+ metricsRegistry.add("PEDING_COMPACTION_OPS", 1);
List<CompactionOpDTO> dtos = sliceHandler.getPendingCompactionOperations(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_URL,
new ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION", 1);
List<FileGroupDTO> dtos = sliceHandler.getAllFileGroups(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -286,12 +352,14 @@ public class RequestHandler {
}, true));
app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new
ViewHandler(ctx -> {
+ metricsRegistry.add("REFRESH_TABLE", 1);
boolean success = sliceHandler
.refreshTable(ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
writeValueAsString(ctx, success);
}, false));
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON,
new ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON", 1);
List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBeforeOrOn(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""),
@@ -300,6 +368,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE,
new ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE", 1);
List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,""),
@@ -308,6 +377,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION,
new ViewHandler(ctx -> {
+ metricsRegistry.add("ALL_REPLACED_FILEGROUPS_PARTITION", 1);
List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow(),
ctx.queryParam(RemoteHoodieTableFileSystemView.PARTITION_PARAM,""));
@@ -315,6 +385,7 @@ public class RequestHandler {
}, true));
app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new
ViewHandler(ctx -> {
+ metricsRegistry.add("PENDING_CLUSTERING_FILEGROUPS", 1);
List<ClusteringOpDTO> dtos =
sliceHandler.getFileGroupsInPendingClustering(
ctx.validatedQueryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM).getOrThrow());
writeValueAsString(ctx, dtos);
@@ -380,6 +451,12 @@ public class RequestHandler {
} finally {
long endTs = System.currentTimeMillis();
long timeTakenMillis = endTs - beginTs;
+ metricsRegistry.add("TOTAL_API_TIME", timeTakenMillis);
+ metricsRegistry.add("TOTAL_REFRESH_TIME", refreshCheckTimeTaken);
+ metricsRegistry.add("TOTAL_HANDLE_TIME", handleTimeTaken);
+ metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
+ metricsRegistry.add("TOTAL_API_CALLS", 1);
+
LOG.info(String.format(
"TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
+ "Success=%s, Query=%s, Host=%s, synced=%s",
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index 47db1fd..3cae889 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -29,10 +29,14 @@ import
org.apache.hudi.common.table.view.FileSystemViewStorageType;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import io.javalin.Javalin;
+import io.javalin.core.util.JettyServerUtil;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
import java.io.IOException;
import java.io.Serializable;
@@ -44,32 +48,40 @@ public class TimelineService {
private static final Logger LOG =
LogManager.getLogger(TimelineService.class);
private static final int START_SERVICE_MAX_RETRIES = 16;
+ private static final int DEFAULT_NUM_THREADS = -1;
private int serverPort;
private Configuration conf;
private transient FileSystem fs;
private transient Javalin app = null;
private transient FileSystemViewManager fsViewsManager;
+ private final int numThreads;
+ private final boolean shouldCompressOutput;
+ private final boolean useAsync;
public int getServerPort() {
return serverPort;
}
- public TimelineService(int serverPort, FileSystemViewManager
globalFileSystemViewManager, Configuration conf)
- throws IOException {
+ public TimelineService(int serverPort, FileSystemViewManager
globalFileSystemViewManager, Configuration conf,
+ int numThreads, boolean compressOutput, boolean useAsync) throws
IOException {
this.conf = FSUtils.prepareHadoopConf(conf);
this.fs = FileSystem.get(conf);
this.serverPort = serverPort;
this.fsViewsManager = globalFileSystemViewManager;
+ this.numThreads = numThreads;
+ this.shouldCompressOutput = compressOutput;
+ this.useAsync = useAsync;
}
public TimelineService(int serverPort, FileSystemViewManager
globalFileSystemViewManager) throws IOException {
- this(serverPort, globalFileSystemViewManager, new Configuration());
+ this(serverPort, globalFileSystemViewManager, new Configuration(),
DEFAULT_NUM_THREADS, true, false);
}
public TimelineService(Config config) throws IOException {
this(config.serverPort, buildFileSystemViewManager(config,
- new SerializableConfiguration(FSUtils.prepareHadoopConf(new
Configuration()))));
+ new SerializableConfiguration(FSUtils.prepareHadoopConf(new
Configuration()))), new Configuration(),
+ config.numThreads, config.compress, config.async);
}
public static class Config implements Serializable {
@@ -97,6 +109,15 @@ public class TimelineService {
@Parameter(names = {"--rocksdb-path", "-rp"}, description = "Root
directory for RocksDB")
public String rocksDBPath =
FileSystemViewStorageConfig.DEFAULT_ROCKSDB_BASE_PATH;
+ @Parameter(names = {"--threads", "-t"}, description = "Number of threads
to use for serving requests")
+ public int numThreads = DEFAULT_NUM_THREADS;
+
+ @Parameter(names = {"--async"}, description = "Use asyncronous request
processing")
+ public boolean async = false;
+
+ @Parameter(names = {"--compress"}, description = "Compress output using
gzip")
+ public boolean compress = true;
+
@Parameter(names = {"--help", "-h"})
public Boolean help = false;
}
@@ -129,8 +150,15 @@ public class TimelineService {
}
public int startService() throws IOException {
- app = Javalin.create();
- RequestHandler requestHandler = new RequestHandler(app, conf,
fsViewsManager);
+ final Server server = numThreads == DEFAULT_NUM_THREADS ?
JettyServerUtil.defaultServer()
+ : new Server(new QueuedThreadPool(numThreads));
+
+ app = Javalin.create().server(() -> server);
+ if (!shouldCompressOutput) {
+ app.disableDynamicGzip();
+ }
+
+ RequestHandler requestHandler = new RequestHandler(app, conf,
fsViewsManager, useAsync);
app.get("/", ctx -> ctx.result("Hello World"));
requestHandler.register();
int realServerPort = startServiceOnPort(serverPort);