This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 08564b387fd [HUDI-7623] Refactoring of RemoteHoodieTableFileSystemView
and RequestHandler (#11032)
08564b387fd is described below
commit 08564b387fd37351133c0bc215540029416a394c
Author: Vova Kolmakov <[email protected]>
AuthorDate: Sun Apr 21 15:22:56 2024 +0700
[HUDI-7623] Refactoring of RemoteHoodieTableFileSystemView and
RequestHandler (#11032)
---
.../view/RemoteHoodieTableFileSystemView.java | 341 ++++++------------
.../hudi/timeline/service/RequestHandler.java | 390 +++++++++++----------
2 files changed, 324 insertions(+), 407 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
index d725778ddfe..84aed796e01 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java
@@ -36,11 +36,11 @@ import org.apache.hudi.common.table.timeline.dto.InstantDTO;
import org.apache.hudi.common.table.timeline.dto.TimelineDTO;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.RetryHelper;
-import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieRemoteException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.module.afterburner.AfterburnerModule;
@@ -66,67 +66,47 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
private static final ObjectMapper OBJECT_MAPPER = new
ObjectMapper().registerModule(new AfterburnerModule());
+ private static final String SCHEME = "http";
private static final String BASE_URL = "/v1/hoodie/view";
public static final String LATEST_PARTITION_SLICES_URL =
String.format("%s/%s", BASE_URL, "slices/partition/latest/");
public static final String LATEST_PARTITION_SLICES_INFLIGHT_URL =
String.format("%s/%s", BASE_URL, "slices/partition/latest/inflight/");
public static final String LATEST_PARTITION_SLICES_STATELESS_URL =
String.format("%s/%s", BASE_URL, "slices/partition/latest/stateless/");
public static final String LATEST_PARTITION_SLICE_URL =
String.format("%s/%s", BASE_URL, "slices/file/latest/");
- public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL =
- String.format("%s/%s", BASE_URL, "slices/uncompacted/partition/latest/");
+ public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL =
String.format("%s/%s", BASE_URL, "slices/uncompacted/partition/latest/");
public static final String ALL_SLICES_URL = String.format("%s/%s", BASE_URL,
"slices/all");
- public static final String LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL =
- String.format("%s/%s", BASE_URL, "slices/merged/beforeoron/latest/");
+ public static final String LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL =
String.format("%s/%s", BASE_URL, "slices/merged/beforeoron/latest/");
public static final String LATEST_SLICES_RANGE_INSTANT_URL =
String.format("%s/%s", BASE_URL, "slices/range/latest/");
- public static final String LATEST_SLICES_BEFORE_ON_INSTANT_URL =
- String.format("%s/%s", BASE_URL, "slices/beforeoron/latest/");
- public static final String ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL =
- String.format("%s/%s", BASE_URL, "slices/all/beforeoron/latest/");
-
- public static final String PENDING_COMPACTION_OPS = String.format("%s/%s",
BASE_URL, "compactions/pending/");
- public static final String PENDING_LOG_COMPACTION_OPS =
String.format("%s/%s", BASE_URL, "logcompactions/pending/");
-
- public static final String LATEST_PARTITION_DATA_FILES_URL =
- String.format("%s/%s", BASE_URL, "datafiles/latest/partition");
- public static final String LATEST_PARTITION_DATA_FILE_URL =
- String.format("%s/%s", BASE_URL, "datafile/latest/partition");
- public static final String ALL_DATA_FILES = String.format("%s/%s", BASE_URL,
"datafiles/all");
- public static final String LATEST_ALL_DATA_FILES = String.format("%s/%s",
BASE_URL, "datafiles/all/latest/");
- public static final String LATEST_DATA_FILE_ON_INSTANT_URL =
String.format("%s/%s", BASE_URL, "datafile/on/latest/");
-
- public static final String LATEST_DATA_FILES_RANGE_INSTANT_URL =
- String.format("%s/%s", BASE_URL, "datafiles/range/latest/");
- public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL =
- String.format("%s/%s", BASE_URL, "datafiles/beforeoron/latest/");
- public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL =
- String.format("%s/%s", BASE_URL, "basefiles/all/beforeoron/");
-
- public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
- String.format("%s/%s", BASE_URL, "filegroups/all/partition/");
-
- public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL =
- String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless/");
-
- public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON =
- String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
-
- public static final String ALL_REPLACED_FILEGROUPS_BEFORE =
- String.format("%s/%s", BASE_URL, "filegroups/replaced/before/");
+ public static final String LATEST_SLICES_BEFORE_ON_INSTANT_URL =
String.format("%s/%s", BASE_URL, "slices/beforeoron/latest/");
+ public static final String ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL =
String.format("%s/%s", BASE_URL, "slices/all/beforeoron/latest/");
- public static final String ALL_REPLACED_FILEGROUPS_AFTER_OR_ON =
- String.format("%s/%s", BASE_URL, "filegroups/replaced/afteroron/");
+ public static final String PENDING_COMPACTION_OPS_URL =
String.format("%s/%s", BASE_URL, "compactions/pending/");
+ public static final String PENDING_LOG_COMPACTION_OPS_URL =
String.format("%s/%s", BASE_URL, "logcompactions/pending/");
- public static final String ALL_REPLACED_FILEGROUPS_PARTITION =
- String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/");
+ public static final String LATEST_PARTITION_DATA_FILES_URL =
String.format("%s/%s", BASE_URL, "datafiles/latest/partition");
+ public static final String LATEST_PARTITION_DATA_FILE_URL =
String.format("%s/%s", BASE_URL, "datafile/latest/partition");
+ public static final String ALL_DATA_FILES_URL = String.format("%s/%s",
BASE_URL, "datafiles/all");
+ public static final String LATEST_ALL_DATA_FILES_URL =
String.format("%s/%s", BASE_URL, "datafiles/all/latest/");
+ public static final String LATEST_DATA_FILE_ON_INSTANT_URL =
String.format("%s/%s", BASE_URL, "datafile/on/latest/");
+ public static final String LATEST_DATA_FILES_RANGE_INSTANT_URL =
String.format("%s/%s", BASE_URL, "datafiles/range/latest/");
+ public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL =
String.format("%s/%s", BASE_URL, "datafiles/beforeoron/latest/");
+ public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL =
String.format("%s/%s", BASE_URL, "basefiles/all/beforeoron/");
+
+ public static final String ALL_FILEGROUPS_FOR_PARTITION_URL =
String.format("%s/%s", BASE_URL, "filegroups/all/partition/");
+ public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL =
String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless/");
+ public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON_URL =
String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/");
+ public static final String ALL_REPLACED_FILEGROUPS_BEFORE_URL =
String.format("%s/%s", BASE_URL, "filegroups/replaced/before/");
+ public static final String ALL_REPLACED_FILEGROUPS_AFTER_OR_ON_URL =
String.format("%s/%s", BASE_URL, "filegroups/replaced/afteroron/");
+ public static final String ALL_REPLACED_FILEGROUPS_PARTITION_URL =
String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/");
- public static final String PENDING_CLUSTERING_FILEGROUPS =
String.format("%s/%s", BASE_URL, "clustering/pending/");
+ public static final String PENDING_CLUSTERING_FILEGROUPS_URL =
String.format("%s/%s", BASE_URL, "clustering/pending/");
- public static final String LAST_INSTANT = String.format("%s/%s", BASE_URL,
"timeline/instant/last");
- public static final String LAST_INSTANTS = String.format("%s/%s", BASE_URL,
"timeline/instants/last");
+ public static final String LAST_INSTANT_URL = String.format("%s/%s",
BASE_URL, "timeline/instant/last");
+ public static final String LAST_INSTANTS_URL = String.format("%s/%s",
BASE_URL, "timeline/instants/last");
- public static final String TIMELINE = String.format("%s/%s", BASE_URL,
"timeline/instants/all");
+ public static final String TIMELINE_URL = String.format("%s/%s", BASE_URL,
"timeline/instants/all");
// POST Requests
- public static final String REFRESH_TABLE = String.format("%s/%s", BASE_URL,
"refresh/");
+ public static final String REFRESH_TABLE_URL = String.format("%s/%s",
BASE_URL, "refresh/");
public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s",
BASE_URL, "loadallpartitions/");
public static final String LOAD_PARTITIONS_URL = String.format("%s/%s",
BASE_URL, "loadpartitions/");
@@ -143,6 +123,7 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
public static final String REFRESH_OFF = "refreshoff";
public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM =
"includependingcompaction";
+ public static final String MULTI_VALUE_SEPARATOR = ",";
private static final Logger LOG =
LoggerFactory.getLogger(RemoteHoodieTableFileSystemView.class);
private static final TypeReference<List<FileSliceDTO>>
FILE_SLICE_DTOS_REFERENCE = new TypeReference<List<FileSliceDTO>>() {};
@@ -176,7 +157,7 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient,
FileSystemViewStorageConfig viewConf) {
- this.basePath = metaClient.getBasePath();
+ this.basePath = metaClient.getBasePathV2().toString();
this.metaClient = metaClient;
this.timeline =
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants();
this.serverHost = viewConf.getRemoteViewServerHost();
@@ -196,9 +177,7 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
RequestMethod method) throws IOException {
ValidationUtils.checkArgument(!closed, "View already closed");
- URIBuilder builder =
- new
URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme("http");
-
+ URIBuilder builder = new
URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme(SCHEME);
queryParameters.forEach(builder::addParameter);
// Adding mandatory parameters - Last instants affecting file-slice
@@ -206,7 +185,7 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash());
String url = builder.toString();
- LOG.info("Sending request : (" + url + ")");
+ LOG.info("Sending request : ({})", url);
Response response = retryHelper != null ? retryHelper.start(() ->
get(timeoutMs, url, method)) : get(timeoutMs, url, method);
String content = response.returnContent().asString(Consts.UTF_8);
return (T) OBJECT_MAPPER.readValue(content, reference);
@@ -252,32 +231,32 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
return paramsMap;
}
+ private Stream<HoodieBaseFile> getLatestBaseFilesFromParams(String
requestPath, Map<String, String> paramsMap) {
+ try {
+ List<BaseFileDTO> dataFiles = executeRequest(requestPath, paramsMap,
+ BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
+ return dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile);
+ } catch (IOException e) {
+ throw new HoodieRemoteException(e);
+ }
+ }
+
@Override
public Stream<HoodieBaseFile> getLatestBaseFiles(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
- return getLatestBaseFilesFromParams(paramsMap,
LATEST_PARTITION_DATA_FILES_URL);
+ return getLatestBaseFilesFromParams(LATEST_PARTITION_DATA_FILES_URL,
paramsMap);
}
@Override
public Stream<HoodieBaseFile> getLatestBaseFiles() {
Map<String, String> paramsMap = getParams();
- return getLatestBaseFilesFromParams(paramsMap, LATEST_ALL_DATA_FILES);
- }
-
- private Stream<HoodieBaseFile> getLatestBaseFilesFromParams(Map<String,
String> paramsMap, String requestPath) {
- try {
- List<BaseFileDTO> dataFiles = executeRequest(requestPath, paramsMap,
- BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
- return dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return getLatestBaseFilesFromParams(LATEST_ALL_DATA_FILES_URL, paramsMap);
}
@Override
public Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String
partitionPath, String maxCommitTime) {
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
- return getLatestBaseFilesFromParams(paramsMap,
LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL);
+ return
getLatestBaseFilesFromParams(LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL,
paramsMap);
}
@Override
@@ -305,35 +284,30 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
public Option<HoodieBaseFile> getBaseFileOn(String partitionPath, String
instantTime, String fileId) {
Map<String, String> paramsMap =
getParamsWithAdditionalParams(partitionPath,
new String[] {INSTANT_PARAM, FILEID_PARAM}, new String[] {instantTime,
fileId});
- try {
- List<BaseFileDTO> dataFiles =
executeRequest(LATEST_DATA_FILE_ON_INSTANT_URL, paramsMap,
- BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
- return Option.fromJavaOptional(dataFiles.stream()
- .map(BaseFileDTO::toHoodieBaseFile)
- .findFirst());
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
Option.fromJavaOptional(getLatestBaseFilesFromParams(LATEST_DATA_FILE_ON_INSTANT_URL,
paramsMap).findFirst());
}
@Override
public Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String>
commitsToReturn) {
- Map<String, String> paramsMap =
- getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new
String[0]), ","));
- return getLatestBaseFilesFromParams(paramsMap,
LATEST_DATA_FILES_RANGE_INSTANT_URL);
+ Map<String, String> paramsMap = getParams(INSTANTS_PARAM,
String.join(MULTI_VALUE_SEPARATOR, commitsToReturn));
+ return getLatestBaseFilesFromParams(LATEST_DATA_FILES_RANGE_INSTANT_URL,
paramsMap);
}
@Override
public Stream<HoodieBaseFile> getAllBaseFiles(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
- return getLatestBaseFilesFromParams(paramsMap, ALL_DATA_FILES);
+ return getLatestBaseFilesFromParams(ALL_DATA_FILES_URL, paramsMap);
}
@Override
- public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
- Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+ public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String
fileId) {
+ Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
+ return
Option.fromJavaOptional(getLatestBaseFilesFromParams(LATEST_PARTITION_DATA_FILE_URL,
paramsMap).findFirst());
+ }
+
+ private Stream<FileSlice> getLatestFileSlicesStreamFromParams(String
requestPath, Map<String, String> paramsMap) {
try {
- List<FileSliceDTO> dataFiles =
executeRequest(LATEST_PARTITION_SLICES_URL, paramsMap,
+ List<FileSliceDTO> dataFiles = executeRequest(requestPath, paramsMap,
FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
return dataFiles.stream().map(FileSliceDTO::toFileSlice);
} catch (IOException e) {
@@ -341,52 +315,34 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
}
+ @Override
+ public Stream<FileSlice> getLatestFileSlices(String partitionPath) {
+ Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+ return getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICES_URL,
paramsMap);
+ }
+
@Override
public Stream<FileSlice> getLatestFileSlicesIncludingInflight(String
partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
- try {
- List<FileSliceDTO> dataFiles =
executeRequest(LATEST_PARTITION_SLICES_INFLIGHT_URL, paramsMap,
- FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
- return dataFiles.stream().map(FileSliceDTO::toFileSlice);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICES_INFLIGHT_URL,
paramsMap);
}
@Override
public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
- try {
- List<FileSliceDTO> dataFiles =
executeRequest(LATEST_PARTITION_SLICES_STATELESS_URL, paramsMap,
- new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET);
- return dataFiles.stream().map(FileSliceDTO::toFileSlice);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICES_STATELESS_URL,
paramsMap);
}
@Override
public Option<FileSlice> getLatestFileSlice(String partitionPath, String
fileId) {
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
- try {
- List<FileSliceDTO> dataFiles =
executeRequest(LATEST_PARTITION_SLICE_URL, paramsMap,
- FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
- return
Option.fromJavaOptional(dataFiles.stream().map(FileSliceDTO::toFileSlice).findFirst());
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
Option.fromJavaOptional(getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICE_URL,
paramsMap).findFirst());
}
@Override
public Stream<FileSlice> getLatestUnCompactedFileSlices(String
partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
- try {
- List<FileSliceDTO> dataFiles =
executeRequest(LATEST_PARTITION_UNCOMPACTED_SLICES_URL, paramsMap,
- FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
- return dataFiles.stream().map(FileSliceDTO::toFileSlice);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getLatestFileSlicesStreamFromParams(LATEST_PARTITION_UNCOMPACTED_SLICES_URL,
paramsMap);
}
@Override
@@ -395,13 +351,7 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
Map<String, String> paramsMap =
getParamsWithAdditionalParams(partitionPath,
new String[] {MAX_INSTANT_PARAM,
INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM},
new String[] {maxCommitTime,
String.valueOf(includeFileSlicesInPendingCompaction)});
- try {
- List<FileSliceDTO> dataFiles =
executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap,
- FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
- return dataFiles.stream().map(FileSliceDTO::toFileSlice);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getLatestFileSlicesStreamFromParams(LATEST_SLICES_BEFORE_ON_INSTANT_URL,
paramsMap);
}
@Override
@@ -425,35 +375,26 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
@Override
public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String
partitionPath, String maxInstantTime) {
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxInstantTime);
- try {
- List<FileSliceDTO> dataFiles =
executeRequest(LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, paramsMap,
- FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
- return dataFiles.stream().map(FileSliceDTO::toFileSlice);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getLatestFileSlicesStreamFromParams(LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL,
paramsMap);
}
@Override
public Stream<FileSlice> getLatestFileSliceInRange(List<String>
commitsToReturn) {
- Map<String, String> paramsMap =
- getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new
String[0]), ","));
- try {
- List<FileSliceDTO> dataFiles =
executeRequest(LATEST_SLICES_RANGE_INSTANT_URL, paramsMap,
- FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET);
- return dataFiles.stream().map(FileSliceDTO::toFileSlice);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ Map<String, String> paramsMap = getParams(INSTANTS_PARAM,
String.join(MULTI_VALUE_SEPARATOR, commitsToReturn));
+ return
getLatestFileSlicesStreamFromParams(LATEST_SLICES_RANGE_INSTANT_URL, paramsMap);
}
@Override
public Stream<FileSlice> getAllFileSlices(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
+ return getLatestFileSlicesStreamFromParams(ALL_SLICES_URL, paramsMap);
+ }
+
+ private Stream<HoodieFileGroup>
getAllFileGroupsForPartitionFromParams(String requestPath, Map<String, String>
paramsMap) {
try {
- List<FileSliceDTO> dataFiles =
- executeRequest(ALL_SLICES_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE,
RequestMethod.GET);
- return dataFiles.stream().map(FileSliceDTO::toFileSlice);
+ List<FileGroupDTO> fileGroups = executeRequest(requestPath, paramsMap,
+ FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
+ return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
@@ -462,73 +403,37 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
@Override
public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
- try {
- List<FileGroupDTO> fileGroups =
executeRequest(ALL_FILEGROUPS_FOR_PARTITION_URL, paramsMap,
- FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
- return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getAllFileGroupsForPartitionFromParams(ALL_FILEGROUPS_FOR_PARTITION_URL,
paramsMap);
}
@Override
public Stream<HoodieFileGroup> getAllFileGroupsStateless(String
partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
- try {
- List<FileGroupDTO> fileGroups =
executeRequest(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, paramsMap,
- new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET);
- return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getAllFileGroupsForPartitionFromParams(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL,
paramsMap);
}
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String
maxCommitTime, String partitionPath) {
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
- try {
- List<FileGroupDTO> fileGroups =
executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, paramsMap,
- FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
- return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON_URL,
paramsMap);
}
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String
maxCommitTime, String partitionPath) {
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime);
- try {
- List<FileGroupDTO> fileGroups =
executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap,
- FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
- return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_BEFORE_URL,
paramsMap);
}
@Override
public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String
minCommitTime, String partitionPath) {
Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, MIN_INSTANT_PARAM, minCommitTime);
- try {
- List<FileGroupDTO> fileGroups =
executeRequest(ALL_REPLACED_FILEGROUPS_AFTER_OR_ON, paramsMap,
- FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
- return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_AFTER_OR_ON_URL,
paramsMap);
}
@Override
public Stream<HoodieFileGroup> getAllReplacedFileGroups(String
partitionPath) {
Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath);
- try {
- List<FileGroupDTO> fileGroups =
executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap,
- FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET);
- return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return
getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_PARTITION_URL,
paramsMap);
}
public boolean refresh() {
@@ -536,38 +441,40 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
try {
// refresh the local timeline first.
this.timeline =
metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants();
- return executeRequest(REFRESH_TABLE, paramsMap, BOOLEAN_TYPE_REFERENCE,
RequestMethod.POST);
+ return executeRequest(REFRESH_TABLE_URL, paramsMap,
BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}
- @Override
- public void loadAllPartitions() {
- Map<String, String> paramsMap = getParams();
+ private void loadPartitions(String requestPath, Map<String, String>
paramsMap) {
try {
- executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap,
BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
+ executeRequest(requestPath, paramsMap, BOOLEAN_TYPE_REFERENCE,
RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}
+ @Override
+ public void loadAllPartitions() {
+ Map<String, String> paramsMap = getParams();
+ loadPartitions(LOAD_ALL_PARTITIONS_URL, paramsMap);
+ }
+
@Override
public void loadPartitions(List<String> partitionPaths) {
+ Map<String, String> paramsMap = getParams();
try {
- Map<String, String> paramsMap = getParams();
paramsMap.put(PARTITIONS_PARAM,
OBJECT_MAPPER.writeValueAsString(partitionPaths));
- executeRequest(LOAD_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE,
RequestMethod.POST);
- } catch (IOException e) {
+ } catch (JsonProcessingException e) {
throw new HoodieRemoteException(e);
}
+ loadPartitions(LOAD_PARTITIONS_URL, paramsMap);
}
- @Override
- public Stream<Pair<String, CompactionOperation>>
getPendingCompactionOperations() {
- Map<String, String> paramsMap = getParams();
+ private Stream<Pair<String, CompactionOperation>>
getPendingCompactionOperations(String requestPath, Map<String, String>
paramsMap) {
try {
- List<CompactionOpDTO> dtos = executeRequest(PENDING_COMPACTION_OPS,
paramsMap,
+ List<CompactionOpDTO> dtos = executeRequest(requestPath, paramsMap,
COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET);
return dtos.stream().map(CompactionOpDTO::toCompactionOperation);
} catch (IOException e) {
@@ -575,23 +482,23 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
}
+ @Override
+ public Stream<Pair<String, CompactionOperation>>
getPendingCompactionOperations() {
+ Map<String, String> paramsMap = getParams();
+ return getPendingCompactionOperations(PENDING_COMPACTION_OPS_URL,
paramsMap);
+ }
+
@Override
public Stream<Pair<String, CompactionOperation>>
getPendingLogCompactionOperations() {
Map<String, String> paramsMap = getParams();
- try {
- List<CompactionOpDTO> dtos = executeRequest(PENDING_LOG_COMPACTION_OPS,
paramsMap,
- COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET);
- return dtos.stream().map(CompactionOpDTO::toCompactionOperation);
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ return getPendingCompactionOperations(PENDING_LOG_COMPACTION_OPS_URL,
paramsMap);
}
@Override
public Stream<Pair<HoodieFileGroupId, HoodieInstant>>
getFileGroupsInPendingClustering() {
Map<String, String> paramsMap = getParams();
try {
- List<ClusteringOpDTO> dtos =
executeRequest(PENDING_CLUSTERING_FILEGROUPS, paramsMap,
+ List<ClusteringOpDTO> dtos =
executeRequest(PENDING_CLUSTERING_FILEGROUPS_URL, paramsMap,
CLUSTERING_OP_DTOS_REFERENCE, RequestMethod.GET);
return dtos.stream().map(ClusteringOpDTO::toClusteringOperation);
} catch (IOException e) {
@@ -599,22 +506,11 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
}
}
- @Override
- public void close() {
- closed = true;
- }
-
- @Override
- public void reset() {
- refresh();
- }
-
@Override
public Option<HoodieInstant> getLastInstant() {
Map<String, String> paramsMap = getParams();
try {
- List<InstantDTO> instants =
- executeRequest(LAST_INSTANT, paramsMap, INSTANT_DTOS_REFERENCE,
RequestMethod.GET);
+ List<InstantDTO> instants = executeRequest(LAST_INSTANT_URL, paramsMap,
INSTANT_DTOS_REFERENCE, RequestMethod.GET);
return
Option.fromJavaOptional(instants.stream().map(InstantDTO::toInstant).findFirst());
} catch (IOException e) {
throw new HoodieRemoteException(e);
@@ -625,31 +521,26 @@ public class RemoteHoodieTableFileSystemView implements
SyncableFileSystemView,
public HoodieTimeline getTimeline() {
Map<String, String> paramsMap = getParams();
try {
- TimelineDTO timeline =
- executeRequest(TIMELINE, paramsMap, TIMELINE_DTO_REFERENCE,
RequestMethod.GET);
- return TimelineDTO.toTimeline(timeline, metaClient);
+ TimelineDTO timelineDto = executeRequest(TIMELINE_URL, paramsMap,
TIMELINE_DTO_REFERENCE, RequestMethod.GET);
+ return TimelineDTO.toTimeline(timelineDto, metaClient);
} catch (IOException e) {
throw new HoodieRemoteException(e);
}
}
@Override
- public void sync() {
+ public void close() {
+ closed = true;
+ }
+
+ @Override
+ public void reset() {
refresh();
}
@Override
- public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String
fileId) {
- Map<String, String> paramsMap =
getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId);
- try {
- List<BaseFileDTO> dataFiles =
executeRequest(LATEST_PARTITION_DATA_FILE_URL, paramsMap,
- BASE_FILE_DTOS_REFERENCE, RequestMethod.GET);
- return Option.fromJavaOptional(dataFiles.stream()
- .map(BaseFileDTO::toHoodieBaseFile)
- .findFirst());
- } catch (IOException e) {
- throw new HoodieRemoteException(e);
- }
+ public void sync() {
+ refresh();
}
private Response get(int timeoutMs, String url, RequestMethod method) throws
IOException {
diff --git
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
index ddaa9582f7f..be0b34d53e1 100644
---
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
+++
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java
@@ -137,13 +137,52 @@ public class RequestHandler {
metricsRegistry.add("WRITE_VALUE_CNT", 1);
metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime);
if (logger.isDebugEnabled()) {
- logger.debug("Jsonify TimeTaken=" + jsonifyTime);
+ logger.debug("Jsonify TimeTaken={}", jsonifyTime);
}
return result;
}
- private static boolean isRefreshCheckDisabledInQuery(Context ctxt) {
- return
Boolean.parseBoolean(ctxt.queryParam(RemoteHoodieTableFileSystemView.REFRESH_OFF));
+ private static String getBasePathParam(Context ctx) {
+ return
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"));
+ }
+
+ private static String getPartitionParam(Context ctx) {
+ return
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault("");
+ }
+
+ private static String getFileIdParam(Context ctx) {
+ return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM,
String.class).getOrThrow(e -> new HoodieException("FILEID is invalid"));
+ }
+
+ private static List<String> getInstantsParam(Context ctx) {
+ return
Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM,
String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is invalid"))
+ .split(RemoteHoodieTableFileSystemView.MULTI_VALUE_SEPARATOR));
+ }
+
+ private static String getMaxInstantParamMandatory(Context ctx) {
+ return
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is
invalid"));
+ }
+
+ private static String getMaxInstantParamOptional(Context ctx) {
+ return
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrDefault("");
+ }
+
+ private static String getMinInstantParam(Context ctx) {
+ return
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MIN_INSTANT_PARAM,
String.class).getOrDefault("");
+ }
+
+ private static String getMarkerDirParam(Context ctx) {
+ return ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM,
String.class).getOrDefault("");
+ }
+
+ private static boolean getIncludeFilesInPendingCompactionParam(Context ctx) {
+ return Boolean.parseBoolean(
+
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM,
String.class)
+ .getOrThrow(e -> new
HoodieException("INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM is invalid")));
+ }
+
+ private static String getInstantStateDirPathParam(Context ctx) {
+ return ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM);
}
public void register() {
@@ -164,59 +203,6 @@ public class RequestHandler {
}
}
- /**
- * Determines if local view of table's timeline is behind that of client's
view.
- */
- private boolean isLocalViewBehind(Context ctx) {
- String basePath =
ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
- String lastKnownInstantFromClient =
- ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS,
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
- String timelineHashFromClient =
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH,
String.class).getOrDefault("");
- HoodieTimeline localTimeline =
-
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ",
TimelineHash=" + timelineHashFromClient
- + "], localTimeline=" + localTimeline.getInstants());
- }
-
- if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
- &&
HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
- return false;
- }
-
- String localTimelineHash = localTimeline.getTimelineHash();
- // refresh if timeline hash mismatches
- if (!localTimelineHash.equals(timelineHashFromClient)) {
- return true;
- }
-
- // As a safety check, even if hash is same, ensure instant is present
- return
!localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
- }
-
- /**
- * Syncs data-set view if local view is behind.
- */
- private boolean syncIfLocalViewBehind(Context ctx) {
- String basePath =
ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
- SyncableFileSystemView view = viewManager.getFileSystemView(basePath);
- synchronized (view) {
- if (isLocalViewBehind(ctx)) {
-
- String lastKnownInstantFromClient = ctx.queryParamAsClass(
- RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class)
- .getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
- HoodieTimeline localTimeline =
viewManager.getFileSystemView(basePath).getTimeline();
- LOG.info("Syncing view as client passed last known instant " +
lastKnownInstantFromClient
- + " as last known instant but server has the following last
instant on timeline :"
- + localTimeline.lastInstant());
- view.sync();
- return true;
- }
- }
- return false;
- }
-
private void writeValueAsString(Context ctx, Object obj) throws
JsonProcessingException {
if (timelineServiceConfig.async) {
writeValueAsStringAsync(ctx, obj);
@@ -244,15 +230,15 @@ public class RequestHandler {
* Register Timeline API calls.
*/
private void registerTimelineAPI() {
- app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT, new ViewHandler(ctx
-> {
+ app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("LAST_INSTANT", 1);
- List<InstantDTO> dtos =
instantHandler.getLastInstant(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).get());
+ List<InstantDTO> dtos =
instantHandler.getLastInstant(getBasePathParam(ctx));
writeValueAsString(ctx, dtos);
}, false));
- app.get(RemoteHoodieTableFileSystemView.TIMELINE, new ViewHandler(ctx -> {
+ app.get(RemoteHoodieTableFileSystemView.TIMELINE_URL, new ViewHandler(ctx
-> {
metricsRegistry.add("TIMELINE", 1);
- TimelineDTO dto =
instantHandler.getTimeline(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).get());
+ TimelineDTO dto = instantHandler.getTimeline(getBasePathParam(ctx));
writeValueAsString(ctx, dto);
}, false));
}
@@ -264,68 +250,66 @@ public class RequestHandler {
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILES_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_DATA_FILES", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
-
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILE_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_DATA_FILE", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFile(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""),
- ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM,
String.class).getOrThrow(e -> new HoodieException("FILEID is invalid")));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx),
+ getFileIdParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
- app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES, new
ViewHandler(ctx -> {
+ app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("LATEST_ALL_DATA_FILES", 1);
- List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+ List<BaseFileDTO> dtos =
dataFileHandler.getLatestDataFiles(getBasePathParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_DATA_FILES_BEFORE_ON_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesBeforeOrOn(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is
invalid")));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx),
+ getMaxInstantParamMandatory(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT", 1);
Map<String, List<BaseFileDTO>> dtos =
dataFileHandler.getAllLatestDataFilesBeforeOrOn(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is
invalid")));
+ getBasePathParam(ctx),
+ getMaxInstantParamMandatory(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFileOn(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""),
+ getBasePathParam(ctx),
+ getPartitionParam(ctx),
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANT_PARAM,
String.class).get(),
- ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM,
String.class).getOrThrow(e -> new HoodieException("FILEID is invalid")));
+ getFileIdParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
- app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES, new
ViewHandler(ctx -> {
+ app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("ALL_DATA_FILES", 1);
List<BaseFileDTO> dtos = dataFileHandler.getAllDataFiles(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_RANGE_INSTANT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_DATA_FILES_RANGE_INSTANT", 1);
List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesInRange(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM,
String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is
invalid")).split(",")));
+ getBasePathParam(ctx),
+ getInstantsParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
}
@@ -337,129 +321,124 @@ public class RequestHandler {
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_SLICES", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlices(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_INFLIGHT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_SLICES_INFLIGHT", 1);
List<FileSliceDTO> dtos =
sliceHandler.getLatestFileSlicesIncludingInflight(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_STATELESS_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_SLICES_STATELESS", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesStateless(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_SLICE", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlice(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""),
- ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM,
String.class).getOrThrow(e -> new HoodieException("FILEID is invalid")));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx),
+ getFileIdParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_UNCOMPACTED_SLICES_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_PARTITION_UNCOMPACTED_SLICES", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestUnCompactedFileSlices(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_SLICES_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("ALL_SLICES", 1);
List<FileSliceDTO> dtos = sliceHandler.getAllFileSlices(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_RANGE_INSTANT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_SLICE_RANGE_INSTANT", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSliceInRange(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM,
String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is
invalid")).split(",")));
+ getBasePathParam(ctx),
+ getInstantsParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_SLICES_MERGED_BEFORE_ON_INSTANT", 1);
List<FileSliceDTO> dtos =
sliceHandler.getLatestMergedFileSlicesBeforeOrOn(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is
invalid")));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx),
+ getMaxInstantParamMandatory(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("LATEST_SLICES_BEFORE_ON_INSTANT", 1);
List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesBeforeOrOn(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is
invalid")),
- Boolean.parseBoolean(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM,
String.class)
- .getOrThrow(e -> new
HoodieException("INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM is invalid"))));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx),
+ getMaxInstantParamMandatory(ctx),
+ getIncludeFilesInPendingCompactionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("ALL_LATEST_SLICES_BEFORE_ON_INSTANT", 1);
Map<String, List<FileSliceDTO>> dtos =
sliceHandler.getAllLatestFileSlicesBeforeOrOn(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is
invalid")));
+ getBasePathParam(ctx),
+ getMaxInstantParamMandatory(ctx));
writeValueAsString(ctx, dtos);
}, true));
- app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new
ViewHandler(ctx -> {
+ app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("PEDING_COMPACTION_OPS", 1);
- List<CompactionOpDTO> dtos = sliceHandler.getPendingCompactionOperations(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+ List<CompactionOpDTO> dtos =
sliceHandler.getPendingCompactionOperations(getBasePathParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
- app.get(RemoteHoodieTableFileSystemView.PENDING_LOG_COMPACTION_OPS, new
ViewHandler(ctx -> {
+ app.get(RemoteHoodieTableFileSystemView.PENDING_LOG_COMPACTION_OPS_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("PEDING_LOG_COMPACTION_OPS", 1);
- List<CompactionOpDTO> dtos =
sliceHandler.getPendingLogCompactionOperations(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+ List<CompactionOpDTO> dtos =
sliceHandler.getPendingLogCompactionOperations(getBasePathParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION", 1);
List<FileGroupDTO> dtos = sliceHandler.getAllFileGroups(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION_STATELESS", 1);
List<FileGroupDTO> dtos = sliceHandler.getAllFileGroupsStateless(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
- app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new
ViewHandler(ctx -> {
+ app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("REFRESH_TABLE", 1);
- boolean success = sliceHandler
-
.refreshTable(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+ boolean success = sliceHandler.refreshTable(getBasePathParam(ctx));
writeValueAsString(ctx, success);
}, false));
app.post(RemoteHoodieTableFileSystemView.LOAD_PARTITIONS_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("LOAD_PARTITIONS", 1);
- String basePath =
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"));
+ String basePath = getBasePathParam(ctx);
try {
List<String> partitionPaths =
OBJECT_MAPPER.readValue(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITIONS_PARAM,
String.class)
.getOrThrow(e -> new HoodieException("Partitions param is
invalid")), LIST_TYPE_REFERENCE);
@@ -472,50 +451,48 @@ public class RequestHandler {
app.post(RemoteHoodieTableFileSystemView.LOAD_ALL_PARTITIONS_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("LOAD_ALL_PARTITIONS", 1);
- boolean success = sliceHandler
-
.loadAllPartitions(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+ boolean success = sliceHandler.loadAllPartitions(getBasePathParam(ctx));
writeValueAsString(ctx, success);
}, false));
-
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON,
new ViewHandler(ctx -> {
+
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON", 1);
List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBeforeOrOn(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrDefault(""),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getMaxInstantParamOptional(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
- app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE,
new ViewHandler(ctx -> {
+
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_URL, new
ViewHandler(ctx -> {
metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE", 1);
List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM,
String.class).getOrDefault(""),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getMaxInstantParamOptional(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
-
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_AFTER_OR_ON,
new ViewHandler(ctx -> {
+
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_AFTER_OR_ON_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("ALL_REPLACED_FILEGROUPS_AFTER_OR_ON", 1);
List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsAfterOrOn(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MIN_INSTANT_PARAM,
String.class).getOrDefault(""),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getMinInstantParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
- app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION,
new ViewHandler(ctx -> {
+
app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("ALL_REPLACED_FILEGROUPS_PARTITION", 1);
List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")),
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM,
String.class).getOrDefault(""));
+ getBasePathParam(ctx),
+ getPartitionParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
- app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new
ViewHandler(ctx -> {
+ app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS_URL,
new ViewHandler(ctx -> {
metricsRegistry.add("PENDING_CLUSTERING_FILEGROUPS", 1);
- List<ClusteringOpDTO> dtos =
sliceHandler.getFileGroupsInPendingClustering(
-
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM,
String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")));
+ List<ClusteringOpDTO> dtos =
sliceHandler.getFileGroupsInPendingClustering(getBasePathParam(ctx));
writeValueAsString(ctx, dtos);
}, true));
}
@@ -523,22 +500,19 @@ public class RequestHandler {
private void registerMarkerAPI() {
app.get(MarkerOperation.ALL_MARKERS_URL, new ViewHandler(ctx -> {
metricsRegistry.add("ALL_MARKERS", 1);
- Set<String> markers = markerHandler.getAllMarkers(
- ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM,
String.class).getOrDefault(""));
+ Set<String> markers =
markerHandler.getAllMarkers(getMarkerDirParam(ctx));
writeValueAsString(ctx, markers);
}, false));
app.get(MarkerOperation.CREATE_AND_MERGE_MARKERS_URL, new ViewHandler(ctx
-> {
metricsRegistry.add("CREATE_AND_MERGE_MARKERS", 1);
- Set<String> markers = markerHandler.getCreateAndMergeMarkers(
- ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM,
String.class).getOrDefault(""));
+ Set<String> markers =
markerHandler.getCreateAndMergeMarkers(getMarkerDirParam(ctx));
writeValueAsString(ctx, markers);
}, false));
app.get(MarkerOperation.MARKERS_DIR_EXISTS_URL, new ViewHandler(ctx -> {
metricsRegistry.add("MARKERS_DIR_EXISTS", 1);
- boolean exist = markerHandler.doesMarkerDirExist(
- ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM,
String.class).getOrDefault(""));
+ boolean exist = markerHandler.doesMarkerDirExist(getMarkerDirParam(ctx));
writeValueAsString(ctx, exist);
}, false));
@@ -546,15 +520,14 @@ public class RequestHandler {
metricsRegistry.add("CREATE_MARKER", 1);
ctx.future(markerHandler.createMarker(
ctx,
- ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM,
String.class).getOrDefault(""),
+ getMarkerDirParam(ctx),
ctx.queryParamAsClass(MarkerOperation.MARKER_NAME_PARAM,
String.class).getOrDefault(""),
ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM,
String.class).getOrDefault("")));
}, false));
app.post(MarkerOperation.DELETE_MARKER_DIR_URL, new ViewHandler(ctx -> {
metricsRegistry.add("DELETE_MARKER_DIR", 1);
- boolean success = markerHandler.deleteMarkers(
- ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM,
String.class).getOrDefault(""));
+ boolean success = markerHandler.deleteMarkers(getMarkerDirParam(ctx));
writeValueAsString(ctx, success);
}, false));
}
@@ -562,32 +535,17 @@ public class RequestHandler {
private void registerInstantStateAPI() {
app.get(InstantStateHandler.ALL_INSTANT_STATE_URL, new ViewHandler(ctx -> {
metricsRegistry.add("ALL_INSTANT_STATE", 1);
- List<InstantStateDTO> instantStates =
instantStateHandler.getAllInstantStates(
- ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM)
- );
+ List<InstantStateDTO> instantStates =
instantStateHandler.getAllInstantStates(getInstantStateDirPathParam(ctx));
writeValueAsString(ctx, instantStates);
}, false));
app.post(InstantStateHandler.REFRESH_INSTANT_STATE, new ViewHandler(ctx ->
{
metricsRegistry.add("REFRESH_INSTANT_STATE", 1);
- boolean success = instantStateHandler.refresh(
- ctx.queryParam(InstantStateHandler.INSTANT_STATE_DIR_PATH_PARAM)
- );
+ boolean success =
instantStateHandler.refresh(getInstantStateDirPathParam(ctx));
writeValueAsString(ctx, success);
}, false));
}
- /**
- * Determine whether to throw an exception when local view of table's
timeline is behind that of client's view.
- */
- private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline
localTimeline, String timelineHashFromClient) {
- Option<HoodieInstant> lastInstant = localTimeline.lastInstant();
- // When performing async clean, we may have one more .clean.completed
after lastInstantTs.
- // In this case, we do not need to throw an exception.
- return !lastInstant.isPresent() ||
!lastInstant.get().getAction().equals(HoodieTimeline.CLEAN_ACTION)
- ||
!localTimeline.findInstantsBefore(lastInstant.get().getTimestamp()).getTimelineHash().equals(timelineHashFromClient);
- }
-
/**
* Used for logging and performing refresh check.
*/
@@ -634,16 +592,13 @@ public class RequestHandler {
if (refreshCheck) {
long beginFinalCheck = System.currentTimeMillis();
if (isLocalViewBehind(context)) {
- String lastKnownInstantFromClient =
context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS,
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
- String timelineHashFromClient =
context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH,
String.class).getOrDefault("");
+ String lastKnownInstantFromClient =
getLastInstantTsParam(context);
+ String timelineHashFromClient = getTimelineHashParam(context);
HoodieTimeline localTimeline =
viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline();
if (shouldThrowExceptionIfLocalViewBehind(localTimeline,
timelineHashFromClient)) {
- String errMsg =
- "Last known instant from client was "
- + lastKnownInstantFromClient
- + " but server has the following timeline "
- + localTimeline.getInstants();
+ String errMsg = String.format("Last known instant from client
was %s but server has the following timeline %s",
+ lastKnownInstantFromClient,
localTimeline.getInstants());
throw new BadRequestResponse(errMsg);
}
}
@@ -653,9 +608,9 @@ public class RequestHandler {
} catch (RuntimeException re) {
success = false;
if (re instanceof BadRequestResponse) {
- LOG.warn("Bad request response due to client view behind server
view. " + re.getMessage());
+ LOG.warn("Bad request response due to client view behind server
view. {}", re.getMessage());
} else {
- LOG.error("Got runtime exception servicing request " +
context.queryString(), re);
+ LOG.error(String.format("Got runtime exception servicing request
%s", context.queryString()), re);
}
throw re;
} finally {
@@ -667,14 +622,85 @@ public class RequestHandler {
metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken);
metricsRegistry.add("TOTAL_API_CALLS", 1);
- LOG.debug(String.format(
- "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], "
- + "Success=%s, Query=%s, Host=%s, synced=%s",
- timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken,
finalCheckTimeTaken, success,
- context.queryString(), context.host(), synced));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TimeTakenMillis[Total={}, Refresh={}, handle={},
Check={}], Success={}, Query={}, Host={}, synced={}",
+ timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken,
finalCheckTimeTaken, success, context.queryString(), context.host(), synced);
+ }
}
return null;
});
}
+
+ /**
+ * Determines if local view of table's timeline is behind that of client's
view.
+ */
+ private boolean isLocalViewBehind(Context ctx) {
+ String basePath =
ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
+ String lastKnownInstantFromClient = getLastInstantTsParam(ctx);
+ String timelineHashFromClient = getTimelineHashParam(ctx);
+ HoodieTimeline localTimeline =
+
viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Client [ LastTs={}, TimelineHash={}],
localTimeline={}",lastKnownInstantFromClient, timelineHashFromClient,
localTimeline.getInstants());
+ }
+
+ if ((!localTimeline.getInstantsAsStream().findAny().isPresent())
+ &&
HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) {
+ return false;
+ }
+
+ String localTimelineHash = localTimeline.getTimelineHash();
+ // refresh if timeline hash mismatches
+ if (!localTimelineHash.equals(timelineHashFromClient)) {
+ return true;
+ }
+
+ // As a safety check, even if hash is same, ensure instant is present
+ return
!localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient);
+ }
+
+ /**
+ * Syncs data-set view if local view is behind.
+ */
+ private boolean syncIfLocalViewBehind(Context ctx) {
+ String basePath =
ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM);
+ SyncableFileSystemView view = viewManager.getFileSystemView(basePath);
+ synchronized (view) {
+ if (isLocalViewBehind(ctx)) {
+ String lastKnownInstantFromClient = getLastInstantTsParam(ctx);
+ HoodieTimeline localTimeline =
viewManager.getFileSystemView(basePath).getTimeline();
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Syncing view as client passed last known instant {} as
last known instant but server has the following last instant on timeline: {}",
+ lastKnownInstantFromClient, localTimeline.lastInstant());
+ }
+ view.sync();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Determine whether to throw an exception when local view of table's
timeline is behind that of client's view.
+ */
+ private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline
localTimeline, String timelineHashFromClient) {
+ Option<HoodieInstant> lastInstant = localTimeline.lastInstant();
+ // When performing async clean, we may have one more .clean.completed
after lastInstantTs.
+ // In this case, we do not need to throw an exception.
+ return !lastInstant.isPresent() ||
!lastInstant.get().getAction().equals(HoodieTimeline.CLEAN_ACTION)
+ ||
!localTimeline.findInstantsBefore(lastInstant.get().getTimestamp()).getTimelineHash().equals(timelineHashFromClient);
+ }
+
+ private boolean isRefreshCheckDisabledInQuery(Context ctx) {
+ return
Boolean.parseBoolean(ctx.queryParam(RemoteHoodieTableFileSystemView.REFRESH_OFF));
+ }
+
+ private String getLastInstantTsParam(Context ctx) {
+ return
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS,
String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS);
+ }
+
+ private String getTimelineHashParam(Context ctx) {
+ return
ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH,
String.class).getOrDefault("");
+ }
}
}