This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 08564b387fd [HUDI-7623] Refactoring of RemoteHoodieTableFileSystemView 
and RequestHandler (#11032)
08564b387fd is described below

commit 08564b387fd37351133c0bc215540029416a394c
Author: Vova Kolmakov <[email protected]>
AuthorDate: Sun Apr 21 15:22:56 2024 +0700

    [HUDI-7623] Refactoring of RemoteHoodieTableFileSystemView and 
RequestHandler (#11032)
---
 .../view/RemoteHoodieTableFileSystemView.java      | 341 ++++++------------
 .../hudi/timeline/service/RequestHandler.java      | 390 +++++++++++----------
 2 files changed, 324 insertions(+), 407 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index d725778ddfe..84aed796e01 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -36,11 +36,11 @@ import org.apache.hudi.common.table.timeline.dto.InstantDTO;
 import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.RetryHelper;
-import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieRemoteException;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
@@ -66,67 +66,47 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
 
   private static final ObjectMapper OBJECT_MAPPER = new 
ObjectMapper().registerModule(new AfterburnerModule());
 
+  private static final String SCHEME = "http";
   private static final String BASE_URL = "/v1/hoodie/view";
   public static final String LATEST_PARTITION_SLICES_URL = 
String.format("%s/%s", BASE_URL, "slices/partition/latest/");
   public static final String LATEST_PARTITION_SLICES_INFLIGHT_URL = 
String.format("%s/%s", BASE_URL, "slices/partition/latest/inflight/");
   public static final String LATEST_PARTITION_SLICES_STATELESS_URL = 
String.format("%s/%s", BASE_URL, "slices/partition/latest/stateless/");
   public static final String LATEST_PARTITION_SLICE_URL = 
String.format("%s/%s", BASE_URL, "slices/file/latest/");
-  public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL =
-      String.format("%s/%s", BASE_URL, "slices/uncompacted/partition/latest/");
+  public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL =  
String.format("%s/%s", BASE_URL, "slices/uncompacted/partition/latest/");
   public static final String ALL_SLICES_URL = String.format("%s/%s", BASE_URL, 
"slices/all");
-  public static final String LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL =
-      String.format("%s/%s", BASE_URL, "slices/merged/beforeoron/latest/");
+  public static final String LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "slices/merged/beforeoron/latest/");
   public static final String LATEST_SLICES_RANGE_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "slices/range/latest/");
-  public static final String LATEST_SLICES_BEFORE_ON_INSTANT_URL =
-      String.format("%s/%s", BASE_URL, "slices/beforeoron/latest/");
-  public static final String ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL =
-      String.format("%s/%s", BASE_URL, "slices/all/beforeoron/latest/");
-
-  public static final String PENDING_COMPACTION_OPS = String.format("%s/%s", 
BASE_URL, "compactions/pending/");
-  public static final String PENDING_LOG_COMPACTION_OPS = 
String.format("%s/%s", BASE_URL, "logcompactions/pending/");
-
-  public static final String LATEST_PARTITION_DATA_FILES_URL =
-      String.format("%s/%s", BASE_URL, "datafiles/latest/partition");
-  public static final String LATEST_PARTITION_DATA_FILE_URL =
-      String.format("%s/%s", BASE_URL, "datafile/latest/partition");
-  public static final String ALL_DATA_FILES = String.format("%s/%s", BASE_URL, 
"datafiles/all");
-  public static final String LATEST_ALL_DATA_FILES = String.format("%s/%s", 
BASE_URL, "datafiles/all/latest/");
-  public static final String LATEST_DATA_FILE_ON_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "datafile/on/latest/");
-
-  public static final String LATEST_DATA_FILES_RANGE_INSTANT_URL =
-      String.format("%s/%s", BASE_URL, "datafiles/range/latest/");
-  public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL =
-      String.format("%s/%s", BASE_URL, "datafiles/beforeoron/latest/");
-  public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL =
-      String.format("%s/%s", BASE_URL, "basefiles/all/beforeoron/");
-
-  public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
-      String.format("%s/%s", BASE_URL, "filegroups/all/partition/");
-
-  public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL =
-      String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless/");
-
-  public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
-      String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
-
-  public static final String ALL_REPLACED_FILEGROUPS_BEFORE =
-      String.format("%s/%s", BASE_URL, "filegroups/replaced/before/");
+  public static final String LATEST_SLICES_BEFORE_ON_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "slices/beforeoron/latest/");
+  public static final String ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "slices/all/beforeoron/latest/");
 
-  public static final String ALL_REPLACED_FILEGROUPS_AFTER_OR_ON =
-          String.format("%s/%s", BASE_URL, "filegroups/replaced/afteroron/");
+  public static final String PENDING_COMPACTION_OPS_URL = 
String.format("%s/%s", BASE_URL, "compactions/pending/");
+  public static final String PENDING_LOG_COMPACTION_OPS_URL = 
String.format("%s/%s", BASE_URL, "logcompactions/pending/");
 
-  public static final String ALL_REPLACED_FILEGROUPS_PARTITION =
-      String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/");
+  public static final String LATEST_PARTITION_DATA_FILES_URL = 
String.format("%s/%s", BASE_URL, "datafiles/latest/partition");
+  public static final String LATEST_PARTITION_DATA_FILE_URL = 
String.format("%s/%s", BASE_URL, "datafile/latest/partition");
+  public static final String ALL_DATA_FILES_URL = String.format("%s/%s", 
BASE_URL, "datafiles/all");
+  public static final String LATEST_ALL_DATA_FILES_URL = 
String.format("%s/%s", BASE_URL, "datafiles/all/latest/");
+  public static final String LATEST_DATA_FILE_ON_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "datafile/on/latest/");
+  public static final String LATEST_DATA_FILES_RANGE_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "datafiles/range/latest/");
+  public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "datafiles/beforeoron/latest/");
+  public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL = 
String.format("%s/%s", BASE_URL, "basefiles/all/beforeoron/");
+
+  public static final String ALL_FILEGROUPS_FOR_PARTITION_URL = 
String.format("%s/%s", BASE_URL, "filegroups/all/partition/");
+  public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL = 
String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless/");
+  public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON_URL = 
String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
+  public static final String ALL_REPLACED_FILEGROUPS_BEFORE_URL = 
String.format("%s/%s", BASE_URL, "filegroups/replaced/before/");
+  public static final String ALL_REPLACED_FILEGROUPS_AFTER_OR_ON_URL = 
String.format("%s/%s", BASE_URL, "filegroups/replaced/afteroron/");
+  public static final String ALL_REPLACED_FILEGROUPS_PARTITION_URL = 
String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/");
   
-  public static final String PENDING_CLUSTERING_FILEGROUPS = 
String.format("%s/%s", BASE_URL, "clustering/pending/");
+  public static final String PENDING_CLUSTERING_FILEGROUPS_URL = 
String.format("%s/%s", BASE_URL, "clustering/pending/");
 
-  public static final String LAST_INSTANT = String.format("%s/%s", BASE_URL, 
"timeline/instant/last");
-  public static final String LAST_INSTANTS = String.format("%s/%s", BASE_URL, 
"timeline/instants/last");
+  public static final String LAST_INSTANT_URL = String.format("%s/%s", 
BASE_URL, "timeline/instant/last");
+  public static final String LAST_INSTANTS_URL = String.format("%s/%s", 
BASE_URL, "timeline/instants/last");
 
-  public static final String TIMELINE = String.format("%s/%s", BASE_URL, 
"timeline/instants/all");
+  public static final String TIMELINE_URL = String.format("%s/%s", BASE_URL, 
"timeline/instants/all");
 
   // POST Requests
-  public static final String REFRESH_TABLE = String.format("%s/%s", BASE_URL, 
"refresh/");
+  public static final String REFRESH_TABLE_URL = String.format("%s/%s", 
BASE_URL, "refresh/");
   public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s", 
BASE_URL, "loadallpartitions/");
   public static final String LOAD_PARTITIONS_URL = String.format("%s/%s", 
BASE_URL, "loadpartitions/");
 
@@ -143,6 +123,7 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   public static final String REFRESH_OFF = "refreshoff";
   public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = 
"includependingcompaction";
 
+  public static final String MULTI_VALUE_SEPARATOR = ",";
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RemoteHoodieTableFileSystemView.class);
   private static final TypeReference<List<FileSliceDTO>> 
FILE_SLICE_DTOS_REFERENCE = new TypeReference<List<FileSliceDTO>>() {};
@@ -176,7 +157,7 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   }
 
   public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, 
FileSystemViewStorageConfig viewConf) {
-    this.basePath = metaClient.getBasePath();
+    this.basePath = metaClient.getBasePathV2().toString();
     this.metaClient = metaClient;
     this.timeline = 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
     this.serverHost = viewConf.getRemoteViewServerHost();
@@ -196,9 +177,7 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
                                RequestMethod method) throws IOException {
     ValidationUtils.checkArgument(!closed, "View already closed");
 
-    URIBuilder builder =
-        new 
URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme("http");
-
+    URIBuilder builder = new 
URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme(SCHEME);
     queryParameters.forEach(builder::addParameter);
 
     // Adding mandatory parameters - Last instants affecting file-slice
@@ -206,7 +185,7 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
 
     String url = builder.toString();
-    LOG.info("Sending request : (" + url + ")");
+    LOG.info("Sending request : ({})", url);
     Response response = retryHelper != null ? retryHelper.start(() -> 
get(timeoutMs, url, method)) : get(timeoutMs, url, method);
     String content = response.returnContent().asString(Consts.UTF_8);
     return (T) OBJECT_MAPPER.readValue(content, reference);
@@ -252,32 +231,32 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     return paramsMap;
   }
 
+  private Stream<HoodieBaseFile> getLatestBaseFilesFromParams(String 
requestPath, Map<String, String> paramsMap) {
+    try {
+      List<BaseFileDTO> dataFiles = executeRequest(requestPath, paramsMap,
+          BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
+      return dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile);
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
   @Override
   public Stream<HoodieBaseFile> getLatestBaseFiles(String partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
-    return getLatestBaseFilesFromParams(paramsMap, 
LATEST_PARTITION_DATA_FILES_URL);
+    return getLatestBaseFilesFromParams(LATEST_PARTITION_DATA_FILES_URL, 
paramsMap);
   }
 
   @Override
   public Stream<HoodieBaseFile> getLatestBaseFiles() {
     Map<String, String> paramsMap = getParams();
-    return getLatestBaseFilesFromParams(paramsMap, LATEST_ALL_DATA_FILES);
-  }
-
-  private Stream<HoodieBaseFile> getLatestBaseFilesFromParams(Map<String, 
String> paramsMap, String requestPath) {
-    try {
-      List<BaseFileDTO> dataFiles = executeRequest(requestPath, paramsMap,
-          BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
-      return dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return getLatestBaseFilesFromParams(LATEST_ALL_DATA_FILES_URL, paramsMap);
   }
 
   @Override
   public Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String 
partitionPath, String maxCommitTime) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
-    return getLatestBaseFilesFromParams(paramsMap, 
LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL);
+    return 
getLatestBaseFilesFromParams(LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL, 
paramsMap);
   }
 
   @Override
@@ -305,35 +284,30 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   public Option<HoodieBaseFile> getBaseFileOn(String partitionPath, String 
instantTime, String fileId) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParams(partitionPath,
         new String[] {INSTANT_PARAM, FILEID_PARAM}, new String[] {instantTime, 
fileId});
-    try {
-      List<BaseFileDTO> dataFiles = 
executeRequest(LATEST_DATA_FILE_ON_INSTANT_URL, paramsMap,
-          BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
-      return Option.fromJavaOptional(dataFiles.stream()
-          .map(BaseFileDTO::toHoodieBaseFile)
-          .findFirst());
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
Option.fromJavaOptional(getLatestBaseFilesFromParams(LATEST_DATA_FILE_ON_INSTANT_URL,
 paramsMap).findFirst());
   }
 
   @Override
   public Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> 
commitsToReturn) {
-    Map<String, String> paramsMap =
-        getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new 
String[0]), ","));
-    return getLatestBaseFilesFromParams(paramsMap, 
LATEST_DATA_FILES_RANGE_INSTANT_URL);
+    Map<String, String> paramsMap = getParams(INSTANTS_PARAM, 
String.join(MULTI_VALUE_SEPARATOR, commitsToReturn));
+    return getLatestBaseFilesFromParams(LATEST_DATA_FILES_RANGE_INSTANT_URL, 
paramsMap);
   }
 
   @Override
   public Stream<HoodieBaseFile> getAllBaseFiles(String partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
-    return getLatestBaseFilesFromParams(paramsMap, ALL_DATA_FILES);
+    return getLatestBaseFilesFromParams(ALL_DATA_FILES_URL, paramsMap);
   }
 
   @Override
-  public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
-    Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+  public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String 
fileId) {
+    Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
+    return 
Option.fromJavaOptional(getLatestBaseFilesFromParams(LATEST_PARTITION_DATA_FILE_URL,
 paramsMap).findFirst());
+  }
+
+  private Stream<FileSlice> getLatestFileSlicesStreamFromParams(String 
requestPath, Map<String, String> paramsMap) {
     try {
-      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_PARTITION_SLICES_URL, paramsMap,
+      List<FileSliceDTO> dataFiles = executeRequest(requestPath, paramsMap,
           FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
       return dataFiles.stream().map(FileSliceDTO::toFileSlice);
     } catch (IOException e) {
@@ -341,52 +315,34 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     }
   }
 
+  @Override
+  public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
+    Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+    return getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICES_URL, 
paramsMap);
+  }
+
   @Override
   public Stream<FileSlice> getLatestFileSlicesIncludingInflight(String 
partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
-    try {
-      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_PARTITION_SLICES_INFLIGHT_URL, paramsMap,
-          FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
-      return dataFiles.stream().map(FileSliceDTO::toFileSlice);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICES_INFLIGHT_URL, 
paramsMap);
   }
 
   @Override
   public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
-    try {
-      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_PARTITION_SLICES_STATELESS_URL, paramsMap,
-          new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
-      return dataFiles.stream().map(FileSliceDTO::toFileSlice);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICES_STATELESS_URL, 
paramsMap);
   }
 
   @Override
   public Option<FileSlice> getLatestFileSlice(String partitionPath, String 
fileId) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
-    try {
-      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_PARTITION_SLICE_URL, paramsMap,
-          FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
-      return 
Option.fromJavaOptional(dataFiles.stream().map(FileSliceDTO::toFileSlice).findFirst());
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
Option.fromJavaOptional(getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICE_URL,
 paramsMap).findFirst());
   }
 
   @Override
   public Stream<FileSlice> getLatestUnCompactedFileSlices(String 
partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
-    try {
-      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_PARTITION_UNCOMPACTED_SLICES_URL, paramsMap,
-          FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
-      return dataFiles.stream().map(FileSliceDTO::toFileSlice);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getLatestFileSlicesStreamFromParams(LATEST_PARTITION_UNCOMPACTED_SLICES_URL, 
paramsMap);
   }
 
   @Override
@@ -395,13 +351,7 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     Map<String, String> paramsMap = 
getParamsWithAdditionalParams(partitionPath,
         new String[] {MAX_INSTANT_PARAM, 
INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM},
         new String[] {maxCommitTime, 
String.valueOf(includeFileSlicesInPendingCompaction)});
-    try {
-      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
-          FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
-      return dataFiles.stream().map(FileSliceDTO::toFileSlice);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getLatestFileSlicesStreamFromParams(LATEST_SLICES_BEFORE_ON_INSTANT_URL, 
paramsMap);
   }
 
   @Override
@@ -425,35 +375,26 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   @Override
   public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String 
partitionPath, String maxInstantTime) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxInstantTime);
-    try {
-      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, paramsMap,
-          FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
-      return dataFiles.stream().map(FileSliceDTO::toFileSlice);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getLatestFileSlicesStreamFromParams(LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, 
paramsMap);
   }
 
   @Override
   public Stream<FileSlice> getLatestFileSliceInRange(List<String> 
commitsToReturn) {
-    Map<String, String> paramsMap =
-        getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new 
String[0]), ","));
-    try {
-      List<FileSliceDTO> dataFiles = 
executeRequest(LATEST_SLICES_RANGE_INSTANT_URL, paramsMap,
-          FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
-      return dataFiles.stream().map(FileSliceDTO::toFileSlice);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    Map<String, String> paramsMap = getParams(INSTANTS_PARAM, 
String.join(MULTI_VALUE_SEPARATOR, commitsToReturn));
+    return 
getLatestFileSlicesStreamFromParams(LATEST_SLICES_RANGE_INSTANT_URL, paramsMap);
   }
 
   @Override
   public Stream<FileSlice> getAllFileSlices(String partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+    return getLatestFileSlicesStreamFromParams(ALL_SLICES_URL, paramsMap);
+  }
+
+  private Stream<HoodieFileGroup> 
getAllFileGroupsForPartitionFromParams(String requestPath, Map<String, String> 
paramsMap) {
     try {
-      List<FileSliceDTO> dataFiles =
-          executeRequest(ALL_SLICES_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, 
RequestMethod.GET);
-      return dataFiles.stream().map(FileSliceDTO::toFileSlice);
+      List<FileGroupDTO> fileGroups = executeRequest(requestPath, paramsMap,
+          FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
+      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
     }
@@ -462,73 +403,37 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   @Override
   public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
-    try {
-      List<FileGroupDTO> fileGroups = 
executeRequest(ALL_FILEGROUPS_FOR_PARTITION_URL, paramsMap,
-          FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
-      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getAllFileGroupsForPartitionFromParams(ALL_FILEGROUPS_FOR_PARTITION_URL, 
paramsMap);
   }
 
   @Override
   public Stream<HoodieFileGroup> getAllFileGroupsStateless(String 
partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
-    try {
-      List<FileGroupDTO> fileGroups = 
executeRequest(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, paramsMap,
-              new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
-      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getAllFileGroupsForPartitionFromParams(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL,
 paramsMap);
   }
 
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String 
maxCommitTime, String partitionPath) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
-    try {
-      List<FileGroupDTO> fileGroups = 
executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, paramsMap,
-          FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
-      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON_URL,
 paramsMap);
   }
 
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String 
maxCommitTime, String partitionPath) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
-    try {
-      List<FileGroupDTO> fileGroups = 
executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap,
-          FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
-      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_BEFORE_URL, 
paramsMap);
   }
 
   @Override
   public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String 
minCommitTime, String partitionPath) {
     Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, MIN_INSTANT_PARAM, minCommitTime);
-    try {
-      List<FileGroupDTO> fileGroups = 
executeRequest(ALL_REPLACED_FILEGROUPS_AFTER_OR_ON, paramsMap,
-              FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
-      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_AFTER_OR_ON_URL, 
paramsMap);
   }
 
   @Override
   public Stream<HoodieFileGroup> getAllReplacedFileGroups(String 
partitionPath) {
     Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
-    try {
-      List<FileGroupDTO> fileGroups = 
executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap,
-          FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
-      return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return 
getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_PARTITION_URL, 
paramsMap);
   }
 
   public boolean refresh() {
@@ -536,38 +441,40 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     try {
       // refresh the local timeline first.
       this.timeline = 
metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
-      return executeRequest(REFRESH_TABLE, paramsMap, BOOLEAN_TYPE_REFERENCE, 
RequestMethod.POST);
+      return executeRequest(REFRESH_TABLE_URL, paramsMap, 
BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
     }
   }
 
-  @Override
-  public void loadAllPartitions() {
-    Map<String, String> paramsMap = getParams();
+  private void loadPartitions(String requestPath, Map<String, String> 
paramsMap) {
     try {
-      executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, 
BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
+      executeRequest(requestPath, paramsMap, BOOLEAN_TYPE_REFERENCE, 
RequestMethod.POST);
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
     }
   }
 
+  @Override
+  public void loadAllPartitions() {
+    Map<String, String> paramsMap = getParams();
+    loadPartitions(LOAD_ALL_PARTITIONS_URL, paramsMap);
+  }
+
   @Override
   public void loadPartitions(List<String> partitionPaths) {
+    Map<String, String> paramsMap = getParams();
     try {
-      Map<String, String> paramsMap = getParams();
       paramsMap.put(PARTITIONS_PARAM, 
OBJECT_MAPPER.writeValueAsString(partitionPaths));
-      executeRequest(LOAD_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, 
RequestMethod.POST);
-    } catch (IOException e) {
+    } catch (JsonProcessingException e) {
       throw new HoodieRemoteException(e);
     }
+    loadPartitions(LOAD_PARTITIONS_URL, paramsMap);
   }
 
-  @Override
-  public Stream<Pair<String, CompactionOperation>> 
getPendingCompactionOperations() {
-    Map<String, String> paramsMap = getParams();
+  private Stream<Pair<String, CompactionOperation>> 
getPendingCompactionOperations(String requestPath, Map<String, String> 
paramsMap) {
     try {
-      List<CompactionOpDTO> dtos = executeRequest(PENDING_COMPACTION_OPS, 
paramsMap,
+      List<CompactionOpDTO> dtos = executeRequest(requestPath, paramsMap,
           COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET);
       return dtos.stream().map(CompactionOpDTO::toCompactionOperation);
     } catch (IOException e) {
@@ -575,23 +482,23 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     }
   }
 
+  @Override
+  public Stream<Pair<String, CompactionOperation>> 
getPendingCompactionOperations() {
+    Map<String, String> paramsMap = getParams();
+    return getPendingCompactionOperations(PENDING_COMPACTION_OPS_URL, 
paramsMap);
+  }
+
   @Override
   public Stream<Pair<String, CompactionOperation>> 
getPendingLogCompactionOperations() {
     Map<String, String> paramsMap = getParams();
-    try {
-      List<CompactionOpDTO> dtos = executeRequest(PENDING_LOG_COMPACTION_OPS, 
paramsMap,
-          COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET);
-      return dtos.stream().map(CompactionOpDTO::toCompactionOperation);
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+    return getPendingCompactionOperations(PENDING_LOG_COMPACTION_OPS_URL, 
paramsMap);
   }
 
   @Override
   public Stream<Pair<HoodieFileGroupId, HoodieInstant>> 
getFileGroupsInPendingClustering() {
     Map<String, String> paramsMap = getParams();
     try {
-      List<ClusteringOpDTO> dtos = 
executeRequest(PENDING_CLUSTERING_FILEGROUPS, paramsMap,
+      List<ClusteringOpDTO> dtos = 
executeRequest(PENDING_CLUSTERING_FILEGROUPS_URL, paramsMap,
           CLUSTERING_OP_DTOS_REFERENCE, RequestMethod.GET);
       return dtos.stream().map(ClusteringOpDTO::toClusteringOperation);
     } catch (IOException e) {
@@ -599,22 +506,11 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
     }
   }
 
-  @Override
-  public void close() {
-    closed = true;
-  }
-
-  @Override
-  public void reset() {
-    refresh();
-  }
-
   @Override
   public Option<HoodieInstant> getLastInstant() {
     Map<String, String> paramsMap = getParams();
     try {
-      List<InstantDTO> instants =
-          executeRequest(LAST_INSTANT, paramsMap, INSTANT_DTOS_REFERENCE, 
RequestMethod.GET);
+      List<InstantDTO> instants = executeRequest(LAST_INSTANT_URL, paramsMap, 
INSTANT_DTOS_REFERENCE, RequestMethod.GET);
       return 
Option.fromJavaOptional(instants.stream().map(InstantDTO::toInstant).findFirst());
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
@@ -625,31 +521,26 @@ public class RemoteHoodieTableFileSystemView implements 
SyncableFileSystemView,
   public HoodieTimeline getTimeline() {
     Map<String, String> paramsMap = getParams();
     try {
-      TimelineDTO timeline =
-          executeRequest(TIMELINE, paramsMap, TIMELINE_DTO_REFERENCE, 
RequestMethod.GET);
-      return TimelineDTO.toTimeline(timeline, metaClient);
+      TimelineDTO timelineDto = executeRequest(TIMELINE_URL, paramsMap, 
TIMELINE_DTO_REFERENCE, RequestMethod.GET);
+      return TimelineDTO.toTimeline(timelineDto, metaClient);
     } catch (IOException e) {
       throw new HoodieRemoteException(e);
     }
   }
 
   @Override
-  public void sync() {
+  public void close() {
+    closed = true;
+  }
+
+  @Override
+  public void reset() {
     refresh();
   }
 
   @Override
-  public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String 
fileId) {
-    Map<String, String> paramsMap = 
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
-    try {
-      List<BaseFileDTO> dataFiles = 
executeRequest(LATEST_PARTITION_DATA_FILE_URL, paramsMap,
-          BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
-      return Option.fromJavaOptional(dataFiles.stream()
-          .map(BaseFileDTO::toHoodieBaseFile)
-          .findFirst());
-    } catch (IOException e) {
-      throw new HoodieRemoteException(e);
-    }
+  public void sync() {
+    refresh();
   }
 
   private Response get(int timeoutMs, String url, RequestMethod method) throws 
IOException {
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index ddaa9582f7f..be0b34d53e1 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -137,13 +137,52 @@ public class RequestHandler {
     metricsRegistry.add("WRITE_VALUE_CNT", 1);
     metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
     if (logger.isDebugEnabled()) {
-      logger.debug("Jsonify TimeTaken=" + jsonifyTime);
+      logger.debug("Jsonify TimeTaken={}", jsonifyTime);
     }
     return result;
   }
 
-  private static boolean isRefreshCheckDisabledInQuery(Context ctxt) {
-    return 
Boolean.parseBoolean(ctxt.queryParam(RemoteHoodieTableFileSystemView.REFRESH_OFF));
+  private static String getBasePathParam(Context ctx) {
+    return 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"));
+  }
+
+  private static String getPartitionParam(Context ctx) {
+    return 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault("");
+  }
+
+  private static String getFileIdParam(Context ctx) {
+    return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, 
String.class).getOrThrow(e -> new HoodieException("FILEID is invalid"));
+  }
+
+  private static List<String> getInstantsParam(Context ctx) {
+    return 
Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM,
 String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is invalid"))
+        .split(RemoteHoodieTableFileSystemView.MULTI_VALUE_SEPARATOR));
+  }
+
+  private static String getMaxInstantParamMandatory(Context ctx) {
+    return 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is 
invalid"));
+  }
+
+  private static String getMaxInstantParamOptional(Context ctx) {
+    return 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrDefault("");
+  }
+
+  private static String getMinInstantParam(Context ctx) {
+    return 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MIN_INSTANT_PARAM, 
String.class).getOrDefault("");
+  }
+
+  private static String getMarkerDirParam(Context ctx) {
+    return ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, 
String.class).getOrDefault("");
+  }
+
+  private static boolean getIncludeFilesInPendingCompactionParam(Context ctx) {
+    return Boolean.parseBoolean(
+        
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM,
 String.class)
+            .getOrThrow(e -> new 
HoodieException("INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM is invalid")));
+  }
+
+  private static String getInstantStateDirPathParam(Context ctx) {
+    return ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM);
   }
 
   public void register() {
@@ -164,59 +203,6 @@ public class RequestHandler {
     }
   }
 
-  /**
-   * Determines if local view of table's timeline is behind that of client's 
view.
-   */
-  private boolean isLocalViewBehind(Context ctx) {
-    String basePath = 
ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
-    String lastKnownInstantFromClient =
-        ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, 
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
-    String timelineHashFromClient = 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, 
String.class).getOrDefault("");
-    HoodieTimeline localTimeline =
-        
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", 
TimelineHash=" + timelineHashFromClient
-          + "], localTimeline=" + localTimeline.getInstants());
-    }
-
-    if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
-        && 
HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
-      return false;
-    }
-
-    String localTimelineHash = localTimeline.getTimelineHash();
-    // refresh if timeline hash mismatches
-    if (!localTimelineHash.equals(timelineHashFromClient)) {
-      return true;
-    }
-
-    // As a safety check, even if hash is same, ensure instant is present
-    return 
!localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
-  }
-
-  /**
-   * Syncs data-set view if local view is behind.
-   */
-  private boolean syncIfLocalViewBehind(Context ctx) {
-    String basePath = 
ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
-    SyncableFileSystemView view = viewManager.getFileSystemView(basePath);
-    synchronized (view) {
-      if (isLocalViewBehind(ctx)) {
-
-        String lastKnownInstantFromClient = ctx.queryParamAsClass(
-                RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class)
-            .getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
-        HoodieTimeline localTimeline = 
viewManager.getFileSystemView(basePath).getTimeline();
-        LOG.info("Syncing view as client passed last known instant " + 
lastKnownInstantFromClient
-            + " as last known instant but server has the following last 
instant on timeline :"
-            + localTimeline.lastInstant());
-        view.sync();
-        return true;
-      }
-    }
-    return false;
-  }
-
   private void writeValueAsString(Context ctx, Object obj) throws 
JsonProcessingException {
     if (timelineServiceConfig.async) {
       writeValueAsStringAsync(ctx, obj);
@@ -244,15 +230,15 @@ public class RequestHandler {
    * Register Timeline API calls.
    */
   private void registerTimelineAPI() {
-    app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT, new ViewHandler(ctx 
-> {
+    app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("LAST_INSTANT", 1);
-      List<InstantDTO> dtos = 
instantHandler.getLastInstant(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
 String.class).get());
+      List<InstantDTO> dtos = 
instantHandler.getLastInstant(getBasePathParam(ctx));
       writeValueAsString(ctx, dtos);
     }, false));
 
-    app.get(RemoteHoodieTableFileSystemView.TIMELINE, new ViewHandler(ctx -> {
+    app.get(RemoteHoodieTableFileSystemView.TIMELINE_URL, new ViewHandler(ctx 
-> {
       metricsRegistry.add("TIMELINE", 1);
-      TimelineDTO dto = 
instantHandler.getTimeline(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
 String.class).get());
+      TimelineDTO dto = instantHandler.getTimeline(getBasePathParam(ctx));
       writeValueAsString(ctx, dto);
     }, false));
   }
@@ -264,68 +250,66 @@ public class RequestHandler {
     app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILES_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_PARTITION_DATA_FILES", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
-
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILE_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_PARTITION_DATA_FILE", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFile(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""),
-          ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, 
String.class).getOrThrow(e -> new HoodieException("FILEID is invalid")));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx),
+          getFileIdParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
-    app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES, new 
ViewHandler(ctx -> {
+    app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_ALL_DATA_FILES", 1);
-      List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+      List<BaseFileDTO> dtos = 
dataFileHandler.getLatestDataFiles(getBasePathParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL,
 new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_DATA_FILES_BEFORE_ON_INSTANT", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesBeforeOrOn(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is 
invalid")));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx),
+          getMaxInstantParamMandatory(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL,
 new ViewHandler(ctx -> {
       metricsRegistry.add("ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT", 1);
       Map<String, List<BaseFileDTO>> dtos = 
dataFileHandler.getAllLatestDataFilesBeforeOrOn(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is 
invalid")));
+          getBasePathParam(ctx),
+          getMaxInstantParamMandatory(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFileOn(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""),
+          getBasePathParam(ctx),
+          getPartitionParam(ctx),
           ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANT_PARAM, 
String.class).get(),
-          ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, 
String.class).getOrThrow(e -> new HoodieException("FILEID is invalid")));
+          getFileIdParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
-    app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES, new 
ViewHandler(ctx -> {
+    app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("ALL_DATA_FILES", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getAllDataFiles(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_RANGE_INSTANT_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_DATA_FILES_RANGE_INSTANT", 1);
       List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesInRange(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM,
 String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is 
invalid")).split(",")));
+          getBasePathParam(ctx),
+          getInstantsParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
   }
@@ -337,129 +321,124 @@ public class RequestHandler {
     app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_PARTITION_SLICES", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlices(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_INFLIGHT_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_PARTITION_SLICES_INFLIGHT", 1);
       List<FileSliceDTO> dtos = 
sliceHandler.getLatestFileSlicesIncludingInflight(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_STATELESS_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_PARTITION_SLICES_STATELESS", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesStateless(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_PARTITION_SLICE", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlice(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""),
-          ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, 
String.class).getOrThrow(e -> new HoodieException("FILEID is invalid")));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx),
+          getFileIdParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_UNCOMPACTED_SLICES_URL,
 new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_PARTITION_UNCOMPACTED_SLICES", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestUnCompactedFileSlices(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.ALL_SLICES_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("ALL_SLICES", 1);
       List<FileSliceDTO> dtos = sliceHandler.getAllFileSlices(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_RANGE_INSTANT_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_SLICE_RANGE_INSTANT", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSliceInRange(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM,
 String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is 
invalid")).split(",")));
+          getBasePathParam(ctx),
+          getInstantsParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL,
 new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_SLICES_MERGED_BEFORE_ON_INSTANT", 1);
       List<FileSliceDTO> dtos = 
sliceHandler.getLatestMergedFileSlicesBeforeOrOn(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is 
invalid")));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx),
+          getMaxInstantParamMandatory(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_BEFORE_ON_INSTANT_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("LATEST_SLICES_BEFORE_ON_INSTANT", 1);
       List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesBeforeOrOn(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is 
invalid")),
-          Boolean.parseBoolean(
-              
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM,
 String.class)
-                  .getOrThrow(e -> new 
HoodieException("INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM is invalid"))));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx),
+          getMaxInstantParamMandatory(ctx),
+          getIncludeFilesInPendingCompactionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL,
 new ViewHandler(ctx -> {
       metricsRegistry.add("ALL_LATEST_SLICES_BEFORE_ON_INSTANT", 1);
       Map<String, List<FileSliceDTO>> dtos = 
sliceHandler.getAllLatestFileSlicesBeforeOrOn(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is 
invalid")));
+          getBasePathParam(ctx),
+          getMaxInstantParamMandatory(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
-    app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new 
ViewHandler(ctx -> {
+    app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("PEDING_COMPACTION_OPS", 1);
-      List<CompactionOpDTO> dtos = sliceHandler.getPendingCompactionOperations(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+      List<CompactionOpDTO> dtos = 
sliceHandler.getPendingCompactionOperations(getBasePathParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
-    app.get(RemoteHoodieTableFileSystemView.PENDING_LOG_COMPACTION_OPS, new 
ViewHandler(ctx -> {
+    app.get(RemoteHoodieTableFileSystemView.PENDING_LOG_COMPACTION_OPS_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("PEDING_LOG_COMPACTION_OPS", 1);
-      List<CompactionOpDTO> dtos = 
sliceHandler.getPendingLogCompactionOperations(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+      List<CompactionOpDTO> dtos = 
sliceHandler.getPendingLogCompactionOperations(getBasePathParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION", 1);
       List<FileGroupDTO> dtos = sliceHandler.getAllFileGroups(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
     
app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL,
 new ViewHandler(ctx -> {
       metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION_STATELESS", 1);
       List<FileGroupDTO> dtos = sliceHandler.getAllFileGroupsStateless(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
-    app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new 
ViewHandler(ctx -> {
+    app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("REFRESH_TABLE", 1);
-      boolean success = sliceHandler
-          
.refreshTable(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
 String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+      boolean success = sliceHandler.refreshTable(getBasePathParam(ctx));
       writeValueAsString(ctx, success);
     }, false));
 
     app.post(RemoteHoodieTableFileSystemView.LOAD_PARTITIONS_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("LOAD_PARTITIONS", 1);
-      String basePath = 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"));
+      String basePath = getBasePathParam(ctx);
       try {
         List<String> partitionPaths = 
OBJECT_MAPPER.readValue(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITIONS_PARAM,
 String.class)
             .getOrThrow(e -> new HoodieException("Partitions param is 
invalid")), LIST_TYPE_REFERENCE);
@@ -472,50 +451,48 @@ public class RequestHandler {
 
     app.post(RemoteHoodieTableFileSystemView.LOAD_ALL_PARTITIONS_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("LOAD_ALL_PARTITIONS", 1);
-      boolean success = sliceHandler
-          
.loadAllPartitions(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
 String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+      boolean success = sliceHandler.loadAllPartitions(getBasePathParam(ctx));
       writeValueAsString(ctx, success);
     }, false));
 
-    
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, 
new ViewHandler(ctx -> {
+    
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON_URL,
 new ViewHandler(ctx -> {
       metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON", 1);
       List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBeforeOrOn(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrDefault(""),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getMaxInstantParamOptional(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
-    app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE, 
new ViewHandler(ctx -> {
+    
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_URL, new 
ViewHandler(ctx -> {
       metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE", 1);
       List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, 
String.class).getOrDefault(""),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getMaxInstantParamOptional(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
-    
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_AFTER_OR_ON, 
new ViewHandler(ctx -> {
+    
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_AFTER_OR_ON_URL,
 new ViewHandler(ctx -> {
       metricsRegistry.add("ALL_REPLACED_FILEGROUPS_AFTER_OR_ON", 1);
       List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsAfterOrOn(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MIN_INSTANT_PARAM, 
String.class).getOrDefault(""),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getMinInstantParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
-    app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION, 
new ViewHandler(ctx -> {
+    
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("ALL_REPLACED_FILEGROUPS_PARTITION", 1);
       List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, 
String.class).getOrDefault(""));
+          getBasePathParam(ctx),
+          getPartitionParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
 
-    app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new 
ViewHandler(ctx -> {
+    app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS_URL, 
new ViewHandler(ctx -> {
       metricsRegistry.add("PENDING_CLUSTERING_FILEGROUPS", 1);
-      List<ClusteringOpDTO> dtos = 
sliceHandler.getFileGroupsInPendingClustering(
-          
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, 
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+      List<ClusteringOpDTO> dtos = 
sliceHandler.getFileGroupsInPendingClustering(getBasePathParam(ctx));
       writeValueAsString(ctx, dtos);
     }, true));
   }
@@ -523,22 +500,19 @@ public class RequestHandler {
   private void registerMarkerAPI() {
     app.get(MarkerOperation.ALL_MARKERS_URL, new ViewHandler(ctx -> {
       metricsRegistry.add("ALL_MARKERS", 1);
-      Set<String> markers = markerHandler.getAllMarkers(
-          ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, 
String.class).getOrDefault(""));
+      Set<String> markers = 
markerHandler.getAllMarkers(getMarkerDirParam(ctx));
       writeValueAsString(ctx, markers);
     }, false));
 
     app.get(MarkerOperation.CREATE_AND_MERGE_MARKERS_URL, new ViewHandler(ctx 
-> {
       metricsRegistry.add("CREATE_AND_MERGE_MARKERS", 1);
-      Set<String> markers = markerHandler.getCreateAndMergeMarkers(
-          ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, 
String.class).getOrDefault(""));
+      Set<String> markers = 
markerHandler.getCreateAndMergeMarkers(getMarkerDirParam(ctx));
       writeValueAsString(ctx, markers);
     }, false));
 
     app.get(MarkerOperation.MARKERS_DIR_EXISTS_URL, new ViewHandler(ctx -> {
       metricsRegistry.add("MARKERS_DIR_EXISTS", 1);
-      boolean exist = markerHandler.doesMarkerDirExist(
-          ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, 
String.class).getOrDefault(""));
+      boolean exist = markerHandler.doesMarkerDirExist(getMarkerDirParam(ctx));
       writeValueAsString(ctx, exist);
     }, false));
 
@@ -546,15 +520,14 @@ public class RequestHandler {
       metricsRegistry.add("CREATE_MARKER", 1);
       ctx.future(markerHandler.createMarker(
           ctx,
-          ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, 
String.class).getOrDefault(""),
+          getMarkerDirParam(ctx),
           ctx.queryParamAsClass(MarkerOperation.MARKER_NAME_PARAM, 
String.class).getOrDefault(""),
           ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM, 
String.class).getOrDefault("")));
     }, false));
 
     app.post(MarkerOperation.DELETE_MARKER_DIR_URL, new ViewHandler(ctx -> {
       metricsRegistry.add("DELETE_MARKER_DIR", 1);
-      boolean success = markerHandler.deleteMarkers(
-          ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, 
String.class).getOrDefault(""));
+      boolean success = markerHandler.deleteMarkers(getMarkerDirParam(ctx));
       writeValueAsString(ctx, success);
     }, false));
   }
@@ -562,32 +535,17 @@ public class RequestHandler {
   private void registerInstantStateAPI() {
     app.get(InstantStateHandler.ALL_INSTANT_STATE_URL, new ViewHandler(ctx -> {
       metricsRegistry.add("ALL_INSTANT_STATE", 1);
-      List<InstantStateDTO> instantStates = 
instantStateHandler.getAllInstantStates(
-          ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM)
-      );
+      List<InstantStateDTO> instantStates = 
instantStateHandler.getAllInstantStates(getInstantStateDirPathParam(ctx));
       writeValueAsString(ctx, instantStates);
     }, false));
 
     app.post(InstantStateHandler.REFRESH_INSTANT_STATE, new ViewHandler(ctx -> 
{
       metricsRegistry.add("REFRESH_INSTANT_STATE", 1);
-      boolean success = instantStateHandler.refresh(
-          ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM)
-      );
+      boolean success = 
instantStateHandler.refresh(getInstantStateDirPathParam(ctx));
       writeValueAsString(ctx, success);
     }, false));
   }
 
-  /**
-   * Determine whether to throw an exception when local view of table's 
timeline is behind that of client's view.
-   */
-  private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline 
localTimeline, String timelineHashFromClient) {
-    Option<HoodieInstant> lastInstant = localTimeline.lastInstant();
-    // When performing async clean, we may have one more .clean.completed 
after lastInstantTs.
-    // In this case, we do not need to throw an exception.
-    return !lastInstant.isPresent() || 
!lastInstant.get().getAction().equals(HoodieTimeline.CLEAN_ACTION)
-        || 
!localTimeline.findInstantsBefore(lastInstant.get().getTimestamp()).getTimelineHash().equals(timelineHashFromClient);
-  }
-
   /**
    * Used for logging and performing refresh check.
    */
@@ -634,16 +592,13 @@ public class RequestHandler {
           if (refreshCheck) {
             long beginFinalCheck = System.currentTimeMillis();
             if (isLocalViewBehind(context)) {
-              String lastKnownInstantFromClient = 
context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, 
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
-              String timelineHashFromClient = 
context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, 
String.class).getOrDefault("");
+              String lastKnownInstantFromClient = 
getLastInstantTsParam(context);
+              String timelineHashFromClient = getTimelineHashParam(context);
               HoodieTimeline localTimeline =
                   
viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline();
               if (shouldThrowExceptionIfLocalViewBehind(localTimeline, 
timelineHashFromClient)) {
-                String errMsg =
-                    "Last known instant from client was "
-                        + lastKnownInstantFromClient
-                        + " but server has the following timeline "
-                        + localTimeline.getInstants();
+                String errMsg = String.format("Last known instant from client 
was %s but server has the following timeline %s",
+                        lastKnownInstantFromClient, 
localTimeline.getInstants());
                 throw new BadRequestResponse(errMsg);
               }
             }
@@ -653,9 +608,9 @@ public class RequestHandler {
         } catch (RuntimeException re) {
           success = false;
           if (re instanceof BadRequestResponse) {
-            LOG.warn("Bad request response due to client view behind server 
view. " + re.getMessage());
+            LOG.warn("Bad request response due to client view behind server 
view. {}", re.getMessage());
           } else {
-            LOG.error("Got runtime exception servicing request " + 
context.queryString(), re);
+            LOG.error(String.format("Got runtime exception servicing request 
%s", context.queryString()), re);
           }
           throw re;
         } finally {
@@ -667,14 +622,85 @@ public class RequestHandler {
           metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
           metricsRegistry.add("TOTAL_API_CALLS", 1);
 
-          LOG.debug(String.format(
-              "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
-                  + "Success=%s, Query=%s, Host=%s, synced=%s",
-              timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, 
finalCheckTimeTaken, success,
-              context.queryString(), context.host(), synced));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("TimeTakenMillis[Total={}, Refresh={}, handle={}, 
Check={}], Success={}, Query={}, Host={}, synced={}",
+                    timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, 
finalCheckTimeTaken, success, context.queryString(), context.host(), synced);
+          }
         }
         return null;
       });
     }
+
+    /**
+     * Determines if local view of table's timeline is behind that of client's 
view.
+     */
+    private boolean isLocalViewBehind(Context ctx) {
+      String basePath = 
ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
+      String lastKnownInstantFromClient = getLastInstantTsParam(ctx);
+      String timelineHashFromClient = getTimelineHashParam(ctx);
+      HoodieTimeline localTimeline =
+          
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Client [ LastTs={}, TimelineHash={}], 
localTimeline={}",lastKnownInstantFromClient, timelineHashFromClient, 
localTimeline.getInstants());
+      }
+
+      if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
+          && 
HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
+        return false;
+      }
+
+      String localTimelineHash = localTimeline.getTimelineHash();
+      // refresh if timeline hash mismatches
+      if (!localTimelineHash.equals(timelineHashFromClient)) {
+        return true;
+      }
+
+      // As a safety check, even if hash is same, ensure instant is present
+      return 
!localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
+    }
+
+    /**
+     * Syncs data-set view if local view is behind.
+     */
+    private boolean syncIfLocalViewBehind(Context ctx) {
+      String basePath = 
ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
+      SyncableFileSystemView view = viewManager.getFileSystemView(basePath);
+      synchronized (view) {
+        if (isLocalViewBehind(ctx)) {
+          String lastKnownInstantFromClient = getLastInstantTsParam(ctx);
+          HoodieTimeline localTimeline = 
viewManager.getFileSystemView(basePath).getTimeline();
+          if (LOG.isInfoEnabled()) {
+            LOG.info("Syncing view as client passed last known instant {} as 
last known instant but server has the following last instant on timeline: {}",
+                lastKnownInstantFromClient, localTimeline.lastInstant());
+          }
+          view.sync();
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /**
+     * Determine whether to throw an exception when local view of table's 
timeline is behind that of client's view.
+     */
+    private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline 
localTimeline, String timelineHashFromClient) {
+      Option<HoodieInstant> lastInstant = localTimeline.lastInstant();
+      // When performing async clean, we may have one more .clean.completed 
after lastInstantTs.
+      // In this case, we do not need to throw an exception.
+      return !lastInstant.isPresent() || 
!lastInstant.get().getAction().equals(HoodieTimeline.CLEAN_ACTION)
+          || 
!localTimeline.findInstantsBefore(lastInstant.get().getTimestamp()).getTimelineHash().equals(timelineHashFromClient);
+    }
+
+    private boolean isRefreshCheckDisabledInQuery(Context ctx) {
+      return 
Boolean.parseBoolean(ctx.queryParam(RemoteHoodieTableFileSystemView.REFRESH_OFF));
+    }
+
+    private String getLastInstantTsParam(Context ctx) {
+      return 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, 
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
+    }
+
+    private String getTimelineHashParam(Context ctx) {
+      return 
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, 
String.class).getOrDefault("");
+    }
   }
 }

Reply via email to