This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 273c7332648c05f06a871dbadbd1d6f6acab917b Author: Vova Kolmakov <[email protected]> AuthorDate: Wed May 15 00:56:54 2024 -0700 [HUDI-7623] Refactoring of RemoteHoodieTableFileSystemView and RequestHandler (#11032) --- .../view/RemoteHoodieTableFileSystemView.java | 333 +++++++----------- .../hudi/timeline/service/RequestHandler.java | 374 +++++++++++---------- 2 files changed, 315 insertions(+), 392 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 6c8295fd75f..7de9119992e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -36,11 +36,11 @@ import org.apache.hudi.common.table.timeline.dto.InstantDTO; import org.apache.hudi.common.table.timeline.dto.TimelineDTO; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.RetryHelper; -import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieRemoteException; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; @@ -66,66 +66,46 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); + private static final String SCHEME = "http"; private static final String BASE_URL = "/v1/hoodie/view"; public static final String LATEST_PARTITION_SLICES_URL = String.format("%s/%s", BASE_URL, "slices/partition/latest/"); public static final String LATEST_PARTITION_SLICES_STATELESS_URL = String.format("%s/%s", BASE_URL, "slices/partition/latest/stateless/"); public static final String LATEST_PARTITION_SLICE_URL = String.format("%s/%s", BASE_URL, "slices/file/latest/"); - public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL = - String.format("%s/%s", BASE_URL, "slices/uncompacted/partition/latest/"); + public static final String LATEST_PARTITION_UNCOMPACTED_SLICES_URL = String.format("%s/%s", BASE_URL, "slices/uncompacted/partition/latest/"); public static final String ALL_SLICES_URL = String.format("%s/%s", BASE_URL, "slices/all"); - public static final String LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL = - String.format("%s/%s", BASE_URL, "slices/merged/beforeoron/latest/"); + public static final String LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL = String.format("%s/%s", BASE_URL, "slices/merged/beforeoron/latest/"); public static final String LATEST_SLICES_RANGE_INSTANT_URL = String.format("%s/%s", BASE_URL, "slices/range/latest/"); - public static final String LATEST_SLICES_BEFORE_ON_INSTANT_URL = - String.format("%s/%s", BASE_URL, "slices/beforeoron/latest/"); - public static final String ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL = - String.format("%s/%s", BASE_URL, "slices/all/beforeoron/latest/"); - - public static final String PENDING_COMPACTION_OPS = String.format("%s/%s", BASE_URL, "compactions/pending/"); - public static final String PENDING_LOG_COMPACTION_OPS = String.format("%s/%s", BASE_URL, "logcompactions/pending/"); - - public static final String LATEST_PARTITION_DATA_FILES_URL = - String.format("%s/%s", BASE_URL, "datafiles/latest/partition"); - public static final String LATEST_PARTITION_DATA_FILE_URL = - String.format("%s/%s", BASE_URL, "datafile/latest/partition"); - public static final String ALL_DATA_FILES = String.format("%s/%s", BASE_URL, "datafiles/all"); - public static final String LATEST_ALL_DATA_FILES = String.format("%s/%s", BASE_URL, "datafiles/all/latest/"); - public static final String LATEST_DATA_FILE_ON_INSTANT_URL = String.format("%s/%s", BASE_URL, "datafile/on/latest/"); - - public static final String LATEST_DATA_FILES_RANGE_INSTANT_URL = - String.format("%s/%s", BASE_URL, "datafiles/range/latest/"); - public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL = - String.format("%s/%s", BASE_URL, "datafiles/beforeoron/latest/"); - public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL = - String.format("%s/%s", BASE_URL, "basefiles/all/beforeoron/"); - - public static final String ALL_FILEGROUPS_FOR_PARTITION_URL = - String.format("%s/%s", BASE_URL, "filegroups/all/partition/"); - - public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL = - String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless/"); - - public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON = - String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/"); - - public static final String ALL_REPLACED_FILEGROUPS_BEFORE = - String.format("%s/%s", BASE_URL, "filegroups/replaced/before/"); + public static final String LATEST_SLICES_BEFORE_ON_INSTANT_URL = String.format("%s/%s", BASE_URL, "slices/beforeoron/latest/"); + public static final String ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL = String.format("%s/%s", BASE_URL, "slices/all/beforeoron/latest/"); - public static final String ALL_REPLACED_FILEGROUPS_AFTER_OR_ON = - String.format("%s/%s", BASE_URL, "filegroups/replaced/afteroron/"); + public static final String PENDING_COMPACTION_OPS_URL = String.format("%s/%s", BASE_URL, "compactions/pending/"); + public static final String PENDING_LOG_COMPACTION_OPS_URL = String.format("%s/%s", BASE_URL, "logcompactions/pending/"); - public static final String ALL_REPLACED_FILEGROUPS_PARTITION = - String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/"); + public static final String LATEST_PARTITION_DATA_FILES_URL = String.format("%s/%s", BASE_URL, "datafiles/latest/partition"); + public static final String LATEST_PARTITION_DATA_FILE_URL = String.format("%s/%s", BASE_URL, "datafile/latest/partition"); + public static final String ALL_DATA_FILES_URL = String.format("%s/%s", BASE_URL, "datafiles/all"); + public static final String LATEST_ALL_DATA_FILES_URL = String.format("%s/%s", BASE_URL, "datafiles/all/latest/"); + public static final String LATEST_DATA_FILE_ON_INSTANT_URL = String.format("%s/%s", BASE_URL, "datafile/on/latest/"); + public static final String LATEST_DATA_FILES_RANGE_INSTANT_URL = String.format("%s/%s", BASE_URL, "datafiles/range/latest/"); + public static final String LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL = String.format("%s/%s", BASE_URL, "datafiles/beforeoron/latest/"); + public static final String ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL = String.format("%s/%s", BASE_URL, "basefiles/all/beforeoron/"); + + public static final String ALL_FILEGROUPS_FOR_PARTITION_URL = String.format("%s/%s", BASE_URL, "filegroups/all/partition/"); + public static final String ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL = String.format("%s/%s", BASE_URL, "filegroups/all/partition/stateless/"); + public static final String ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON_URL = String.format("%s/%s", BASE_URL, "filegroups/replaced/beforeoron/"); + public static final String ALL_REPLACED_FILEGROUPS_BEFORE_URL = String.format("%s/%s", BASE_URL, "filegroups/replaced/before/"); + public static final String ALL_REPLACED_FILEGROUPS_AFTER_OR_ON_URL = String.format("%s/%s", BASE_URL, "filegroups/replaced/afteroron/"); + public static final String ALL_REPLACED_FILEGROUPS_PARTITION_URL = String.format("%s/%s", BASE_URL, "filegroups/replaced/partition/"); - public static final String PENDING_CLUSTERING_FILEGROUPS = String.format("%s/%s", BASE_URL, "clustering/pending/"); + public static final String PENDING_CLUSTERING_FILEGROUPS_URL = String.format("%s/%s", BASE_URL, "clustering/pending/"); - public static final String LAST_INSTANT = String.format("%s/%s", BASE_URL, "timeline/instant/last"); - public static final String LAST_INSTANTS = String.format("%s/%s", BASE_URL, "timeline/instants/last"); + public static final String LAST_INSTANT_URL = String.format("%s/%s", BASE_URL, "timeline/instant/last"); + public static final String LAST_INSTANTS_URL = String.format("%s/%s", BASE_URL, "timeline/instants/last"); - public static final String TIMELINE = String.format("%s/%s", BASE_URL, "timeline/instants/all"); + public static final String TIMELINE_URL = String.format("%s/%s", BASE_URL, "timeline/instants/all"); // POST Requests - public static final String REFRESH_TABLE = String.format("%s/%s", BASE_URL, "refresh/"); + public static final String REFRESH_TABLE_URL = String.format("%s/%s", BASE_URL, "refresh/"); public static final String LOAD_ALL_PARTITIONS_URL = String.format("%s/%s", BASE_URL, "loadallpartitions/"); public static final String LOAD_PARTITIONS_URL = String.format("%s/%s", BASE_URL, "loadpartitions/"); @@ -142,6 +122,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, public static final String REFRESH_OFF = "refreshoff"; public static final String INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM = "includependingcompaction"; + public static final String MULTI_VALUE_SEPARATOR = ","; private static final Logger LOG = LoggerFactory.getLogger(RemoteHoodieTableFileSystemView.class); private static final TypeReference<List<FileSliceDTO>> FILE_SLICE_DTOS_REFERENCE = new TypeReference<List<FileSliceDTO>>() {}; @@ -175,7 +156,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, } public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) { - this.basePath = metaClient.getBasePath(); + this.basePath = metaClient.getBasePathV2().toString(); this.metaClient = metaClient; this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); this.serverHost = viewConf.getRemoteViewServerHost(); @@ -195,9 +176,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, RequestMethod method) throws IOException { ValidationUtils.checkArgument(!closed, "View already closed"); - URIBuilder builder = - new URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme("http"); - + URIBuilder builder = new URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme(SCHEME); queryParameters.forEach(builder::addParameter); // Adding mandatory parameters - Last instants affecting file-slice @@ -205,7 +184,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, builder.addParameter(TIMELINE_HASH, timeline.getTimelineHash()); String url = builder.toString(); - LOG.info("Sending request : (" + url + ")"); + LOG.info("Sending request : ({})", url); Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method); String content = response.returnContent().asString(Consts.UTF_8); return (T) OBJECT_MAPPER.readValue(content, reference); @@ -251,32 +230,32 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, return paramsMap; } + private Stream<HoodieBaseFile> getLatestBaseFilesFromParams(String requestPath, Map<String, String> paramsMap) { + try { + List<BaseFileDTO> dataFiles = executeRequest(requestPath, paramsMap, + BASE_FILE_DTOS_REFERENCE, RequestMethod.GET); + return dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile); + } catch (IOException e) { + throw new HoodieRemoteException(e); + } + } + @Override public Stream<HoodieBaseFile> getLatestBaseFiles(String partitionPath) { Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); - return getLatestBaseFilesFromParams(paramsMap, LATEST_PARTITION_DATA_FILES_URL); + return getLatestBaseFilesFromParams(LATEST_PARTITION_DATA_FILES_URL, paramsMap); } @Override public Stream<HoodieBaseFile> getLatestBaseFiles() { Map<String, String> paramsMap = getParams(); - return getLatestBaseFilesFromParams(paramsMap, LATEST_ALL_DATA_FILES); - } - - private Stream<HoodieBaseFile> getLatestBaseFilesFromParams(Map<String, String> paramsMap, String requestPath) { - try { - List<BaseFileDTO> dataFiles = executeRequest(requestPath, paramsMap, - BASE_FILE_DTOS_REFERENCE, RequestMethod.GET); - return dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getLatestBaseFilesFromParams(LATEST_ALL_DATA_FILES_URL, paramsMap); } @Override public Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitionPath, String maxCommitTime) { Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); - return getLatestBaseFilesFromParams(paramsMap, LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL); + return getLatestBaseFilesFromParams(LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL, paramsMap); } @Override @@ -304,35 +283,30 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, public Option<HoodieBaseFile> getBaseFileOn(String partitionPath, String instantTime, String fileId) { Map<String, String> paramsMap = getParamsWithAdditionalParams(partitionPath, new String[] {INSTANT_PARAM, FILEID_PARAM}, new String[] {instantTime, fileId}); - try { - List<BaseFileDTO> dataFiles = executeRequest(LATEST_DATA_FILE_ON_INSTANT_URL, paramsMap, - BASE_FILE_DTOS_REFERENCE, RequestMethod.GET); - return Option.fromJavaOptional(dataFiles.stream() - .map(BaseFileDTO::toHoodieBaseFile) - .findFirst()); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return Option.fromJavaOptional(getLatestBaseFilesFromParams(LATEST_DATA_FILE_ON_INSTANT_URL, paramsMap).findFirst()); } @Override public Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commitsToReturn) { - Map<String, String> paramsMap = - getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new String[0]), ",")); - return getLatestBaseFilesFromParams(paramsMap, LATEST_DATA_FILES_RANGE_INSTANT_URL); + Map<String, String> paramsMap = getParams(INSTANTS_PARAM, String.join(MULTI_VALUE_SEPARATOR, commitsToReturn)); + return getLatestBaseFilesFromParams(LATEST_DATA_FILES_RANGE_INSTANT_URL, paramsMap); } @Override public Stream<HoodieBaseFile> getAllBaseFiles(String partitionPath) { Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); - return getLatestBaseFilesFromParams(paramsMap, ALL_DATA_FILES); + return getLatestBaseFilesFromParams(ALL_DATA_FILES_URL, paramsMap); } @Override - public Stream<FileSlice> getLatestFileSlices(String partitionPath) { - Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); + public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fileId) { + Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId); + return Option.fromJavaOptional(getLatestBaseFilesFromParams(LATEST_PARTITION_DATA_FILE_URL, paramsMap).findFirst()); + } + + private Stream<FileSlice> getLatestFileSlicesStreamFromParams(String requestPath, Map<String, String> paramsMap) { try { - List<FileSliceDTO> dataFiles = executeRequest(LATEST_PARTITION_SLICES_URL, paramsMap, + List<FileSliceDTO> dataFiles = executeRequest(requestPath, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); return dataFiles.stream().map(FileSliceDTO::toFileSlice); } catch (IOException e) { @@ -340,40 +314,28 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, } } + @Override + public Stream<FileSlice> getLatestFileSlices(String partitionPath) { + Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); + return getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICES_URL, paramsMap); + } + @Override public Stream<FileSlice> getLatestFileSlicesStateless(String partitionPath) { Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); - try { - List<FileSliceDTO> dataFiles = executeRequest(LATEST_PARTITION_SLICES_STATELESS_URL, paramsMap, - new TypeReference<List<FileSliceDTO>>() {}, RequestMethod.GET); - return dataFiles.stream().map(FileSliceDTO::toFileSlice); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICES_STATELESS_URL, paramsMap); } @Override public Option<FileSlice> getLatestFileSlice(String partitionPath, String fileId) { Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId); - try { - List<FileSliceDTO> dataFiles = executeRequest(LATEST_PARTITION_SLICE_URL, paramsMap, - FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); - return Option.fromJavaOptional(dataFiles.stream().map(FileSliceDTO::toFileSlice).findFirst()); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return Option.fromJavaOptional(getLatestFileSlicesStreamFromParams(LATEST_PARTITION_SLICE_URL, paramsMap).findFirst()); } @Override public Stream<FileSlice> getLatestUnCompactedFileSlices(String partitionPath) { Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); - try { - List<FileSliceDTO> dataFiles = executeRequest(LATEST_PARTITION_UNCOMPACTED_SLICES_URL, paramsMap, - FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); - return dataFiles.stream().map(FileSliceDTO::toFileSlice); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getLatestFileSlicesStreamFromParams(LATEST_PARTITION_UNCOMPACTED_SLICES_URL, paramsMap); } @Override @@ -382,13 +344,7 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, Map<String, String> paramsMap = getParamsWithAdditionalParams(partitionPath, new String[] {MAX_INSTANT_PARAM, INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM}, new String[] {maxCommitTime, String.valueOf(includeFileSlicesInPendingCompaction)}); - try { - List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap, - FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); - return dataFiles.stream().map(FileSliceDTO::toFileSlice); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getLatestFileSlicesStreamFromParams(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap); } @Override @@ -412,35 +368,26 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, @Override public Stream<FileSlice> getLatestMergedFileSlicesBeforeOrOn(String partitionPath, String maxInstantTime) { Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxInstantTime); - try { - List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, paramsMap, - FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); - return dataFiles.stream().map(FileSliceDTO::toFileSlice); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getLatestFileSlicesStreamFromParams(LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, paramsMap); } @Override public Stream<FileSlice> getLatestFileSliceInRange(List<String> commitsToReturn) { - Map<String, String> paramsMap = - getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new String[0]), ",")); - try { - List<FileSliceDTO> dataFiles = executeRequest(LATEST_SLICES_RANGE_INSTANT_URL, paramsMap, - FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); - return dataFiles.stream().map(FileSliceDTO::toFileSlice); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + Map<String, String> paramsMap = getParams(INSTANTS_PARAM, String.join(MULTI_VALUE_SEPARATOR, commitsToReturn)); + return getLatestFileSlicesStreamFromParams(LATEST_SLICES_RANGE_INSTANT_URL, paramsMap); } @Override public Stream<FileSlice> getAllFileSlices(String partitionPath) { Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); + return getLatestFileSlicesStreamFromParams(ALL_SLICES_URL, paramsMap); + } + + private Stream<HoodieFileGroup> getAllFileGroupsForPartitionFromParams(String requestPath, Map<String, String> paramsMap) { try { - List<FileSliceDTO> dataFiles = - executeRequest(ALL_SLICES_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); - return dataFiles.stream().map(FileSliceDTO::toFileSlice); + List<FileGroupDTO> fileGroups = executeRequest(requestPath, paramsMap, + FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); + return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); } @@ -449,73 +396,37 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, @Override public Stream<HoodieFileGroup> getAllFileGroups(String partitionPath) { Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); - try { - List<FileGroupDTO> fileGroups = executeRequest(ALL_FILEGROUPS_FOR_PARTITION_URL, paramsMap, - FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); - return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getAllFileGroupsForPartitionFromParams(ALL_FILEGROUPS_FOR_PARTITION_URL, paramsMap); } @Override public Stream<HoodieFileGroup> getAllFileGroupsStateless(String partitionPath) { Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); - try { - List<FileGroupDTO> fileGroups = executeRequest(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, paramsMap, - new TypeReference<List<FileGroupDTO>>() {}, RequestMethod.GET); - return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getAllFileGroupsForPartitionFromParams(ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, paramsMap); } @Override public Stream<HoodieFileGroup> getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); - try { - List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, paramsMap, - FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); - return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON_URL, paramsMap); } @Override public Stream<HoodieFileGroup> getReplacedFileGroupsBefore(String maxCommitTime, String partitionPath) { Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); - try { - List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap, - FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); - return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_BEFORE_URL, paramsMap); } @Override public Stream<HoodieFileGroup> getReplacedFileGroupsAfterOrOn(String minCommitTime, String partitionPath) { Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, MIN_INSTANT_PARAM, minCommitTime); - try { - List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_AFTER_OR_ON, paramsMap, - FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); - return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_AFTER_OR_ON_URL, paramsMap); } @Override public Stream<HoodieFileGroup> getAllReplacedFileGroups(String partitionPath) { Map<String, String> paramsMap = getParamsWithPartitionPath(partitionPath); - try { - List<FileGroupDTO> fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap, - FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); - return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getAllFileGroupsForPartitionFromParams(ALL_REPLACED_FILEGROUPS_PARTITION_URL, paramsMap); } public boolean refresh() { @@ -523,38 +434,40 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, try { // refresh the local timeline first. this.timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants(); - return executeRequest(REFRESH_TABLE, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); + return executeRequest(REFRESH_TABLE_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException(e); } } - @Override - public void loadAllPartitions() { - Map<String, String> paramsMap = getParams(); + private void loadPartitions(String requestPath, Map<String, String> paramsMap) { try { - executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); + executeRequest(requestPath, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException(e); } } + @Override + public void loadAllPartitions() { + Map<String, String> paramsMap = getParams(); + loadPartitions(LOAD_ALL_PARTITIONS_URL, paramsMap); + } + @Override public void loadPartitions(List<String> partitionPaths) { + Map<String, String> paramsMap = getParams(); try { - Map<String, String> paramsMap = getParams(); paramsMap.put(PARTITIONS_PARAM, OBJECT_MAPPER.writeValueAsString(partitionPaths)); - executeRequest(LOAD_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); - } catch (IOException e) { + } catch (JsonProcessingException e) { throw new HoodieRemoteException(e); } + loadPartitions(LOAD_PARTITIONS_URL, paramsMap); } - @Override - public Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations() { - Map<String, String> paramsMap = getParams(); + private Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations(String requestPath, Map<String, String> paramsMap) { try { - List<CompactionOpDTO> dtos = executeRequest(PENDING_COMPACTION_OPS, paramsMap, + List<CompactionOpDTO> dtos = executeRequest(requestPath, paramsMap, COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET); return dtos.stream().map(CompactionOpDTO::toCompactionOperation); } catch (IOException e) { @@ -562,23 +475,23 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, } } + @Override + public Stream<Pair<String, CompactionOperation>> getPendingCompactionOperations() { + Map<String, String> paramsMap = getParams(); + return getPendingCompactionOperations(PENDING_COMPACTION_OPS_URL, paramsMap); + } + @Override public Stream<Pair<String, CompactionOperation>> getPendingLogCompactionOperations() { Map<String, String> paramsMap = getParams(); - try { - List<CompactionOpDTO> dtos = executeRequest(PENDING_LOG_COMPACTION_OPS, paramsMap, - COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET); - return dtos.stream().map(CompactionOpDTO::toCompactionOperation); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + return getPendingCompactionOperations(PENDING_LOG_COMPACTION_OPS_URL, paramsMap); } @Override public Stream<Pair<HoodieFileGroupId, HoodieInstant>> getFileGroupsInPendingClustering() { Map<String, String> paramsMap = getParams(); try { - List<ClusteringOpDTO> dtos = executeRequest(PENDING_CLUSTERING_FILEGROUPS, paramsMap, + List<ClusteringOpDTO> dtos = executeRequest(PENDING_CLUSTERING_FILEGROUPS_URL, paramsMap, CLUSTERING_OP_DTOS_REFERENCE, RequestMethod.GET); return dtos.stream().map(ClusteringOpDTO::toClusteringOperation); } catch (IOException e) { @@ -586,22 +499,11 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, } } - @Override - public void close() { - closed = true; - } - - @Override - public void reset() { - refresh(); - } - @Override public Option<HoodieInstant> getLastInstant() { Map<String, String> paramsMap = getParams(); try { - List<InstantDTO> instants = - executeRequest(LAST_INSTANT, paramsMap, INSTANT_DTOS_REFERENCE, RequestMethod.GET); + List<InstantDTO> instants = executeRequest(LAST_INSTANT_URL, paramsMap, INSTANT_DTOS_REFERENCE, RequestMethod.GET); return Option.fromJavaOptional(instants.stream().map(InstantDTO::toInstant).findFirst()); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -612,31 +514,26 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, public HoodieTimeline getTimeline() { Map<String, String> paramsMap = getParams(); try { - TimelineDTO timeline = - executeRequest(TIMELINE, paramsMap, TIMELINE_DTO_REFERENCE, RequestMethod.GET); - return TimelineDTO.toTimeline(timeline, metaClient); + TimelineDTO timelineDto = executeRequest(TIMELINE_URL, paramsMap, TIMELINE_DTO_REFERENCE, RequestMethod.GET); + return TimelineDTO.toTimeline(timelineDto, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); } } @Override - public void sync() { + public void close() { + closed = true; + } + + @Override + public void reset() { refresh(); } @Override - public Option<HoodieBaseFile> getLatestBaseFile(String partitionPath, String fileId) { - Map<String, String> paramsMap = getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId); - try { - List<BaseFileDTO> dataFiles = executeRequest(LATEST_PARTITION_DATA_FILE_URL, paramsMap, - BASE_FILE_DTOS_REFERENCE, RequestMethod.GET); - return Option.fromJavaOptional(dataFiles.stream() - .map(BaseFileDTO::toHoodieBaseFile) - .findFirst()); - } catch (IOException e) { - throw new HoodieRemoteException(e); - } + public void sync() { + refresh(); } private Response get(int timeoutMs, String url, RequestMethod method) throws IOException { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 009a7bf848b..1a1ac5563ac 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -129,15 +129,50 @@ public class RequestHandler { metricsRegistry.add("WRITE_VALUE_CNT", 1); metricsRegistry.add("WRITE_VALUE_TIME", jsonifyTime); if (logger.isDebugEnabled()) { - logger.debug("Jsonify TimeTaken=" + jsonifyTime); + logger.debug("Jsonify TimeTaken={}", jsonifyTime); } return result; } - private static boolean isRefreshCheckDisabledInQuery(Context ctxt) { - return Boolean.parseBoolean(ctxt.queryParam(RemoteHoodieTableFileSystemView.REFRESH_OFF)); + private static String getBasePathParam(Context ctx) { + return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")); } + private static String getPartitionParam(Context ctx) { + return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""); + } + + private static String getFileIdParam(Context ctx) { + return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, String.class).getOrThrow(e -> new HoodieException("FILEID is invalid")); + } + + private static List<String> getInstantsParam(Context ctx) { + return Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM, String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is invalid")) + .split(RemoteHoodieTableFileSystemView.MULTI_VALUE_SEPARATOR)); + } + + private static String getMaxInstantParamMandatory(Context ctx) { + return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid")); + } + + private static String getMaxInstantParamOptional(Context ctx) { + return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrDefault(""); + } + + private static String getMinInstantParam(Context ctx) { + return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MIN_INSTANT_PARAM, String.class).getOrDefault(""); + } + + private static String getMarkerDirParam(Context ctx) { + return ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault(""); + } + + private static boolean getIncludeFilesInPendingCompactionParam(Context ctx) { + return Boolean.parseBoolean( + ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM, String.class) + .getOrThrow(e -> new HoodieException("INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM is invalid"))); + } + public void register() { registerDataFilesAPI(); registerFileSlicesAPI(); @@ -153,59 +188,6 @@ public class RequestHandler { } } - /** - * Determines if local view of table's timeline is behind that of client's view. - */ - private boolean isLocalViewBehind(Context ctx) { - String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM); - String lastKnownInstantFromClient = - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS); - String timelineHashFromClient = ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, String.class).getOrDefault(""); - HoodieTimeline localTimeline = - viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants(); - if (LOG.isDebugEnabled()) { - LOG.debug("Client [ LastTs=" + lastKnownInstantFromClient + ", TimelineHash=" + timelineHashFromClient - + "], localTimeline=" + localTimeline.getInstants()); - } - - if ((!localTimeline.getInstantsAsStream().findAny().isPresent()) - && HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) { - return false; - } - - String localTimelineHash = localTimeline.getTimelineHash(); - // refresh if timeline hash mismatches - if (!localTimelineHash.equals(timelineHashFromClient)) { - return true; - } - - // As a safety check, even if hash is same, ensure instant is present - return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient); - } - - /** - * Syncs data-set view if local view is behind. - */ - private boolean syncIfLocalViewBehind(Context ctx) { - String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM); - SyncableFileSystemView view = viewManager.getFileSystemView(basePath); - synchronized (view) { - if (isLocalViewBehind(ctx)) { - - String lastKnownInstantFromClient = ctx.queryParamAsClass( - RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class) - .getOrDefault(HoodieTimeline.INVALID_INSTANT_TS); - HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline(); - LOG.info("Syncing view as client passed last known instant " + lastKnownInstantFromClient - + " as last known instant but server has the following last instant on timeline :" - + localTimeline.lastInstant()); - view.sync(); - return true; - } - } - return false; - } - private void writeValueAsString(Context ctx, Object obj) throws JsonProcessingException { if (timelineServiceConfig.async) { writeValueAsStringAsync(ctx, obj); @@ -233,15 +215,15 @@ public class RequestHandler { * Register Timeline API calls. */ private void registerTimelineAPI() { - app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.LAST_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LAST_INSTANT", 1); - List<InstantDTO> dtos = instantHandler.getLastInstant(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).get()); + List<InstantDTO> dtos = instantHandler.getLastInstant(getBasePathParam(ctx)); writeValueAsString(ctx, dtos); }, false)); - app.get(RemoteHoodieTableFileSystemView.TIMELINE, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.TIMELINE_URL, new ViewHandler(ctx -> { metricsRegistry.add("TIMELINE", 1); - TimelineDTO dto = instantHandler.getTimeline(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).get()); + TimelineDTO dto = instantHandler.getTimeline(getBasePathParam(ctx)); writeValueAsString(ctx, dto); }, false)); } @@ -253,68 +235,66 @@ public class RequestHandler { app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILES_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_DATA_FILES", 1); List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); - + getBasePathParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_DATA_FILE_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_DATA_FILE", 1); List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFile( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, String.class).getOrThrow(e -> new HoodieException("FILEID is invalid"))); + getBasePathParam(ctx), + getPartitionParam(ctx), + getFileIdParam(ctx)); writeValueAsString(ctx, dtos); }, true)); - app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.LATEST_ALL_DATA_FILES_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_ALL_DATA_FILES", 1); - List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); + List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFiles(getBasePathParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_DATA_FILES_BEFORE_ON_INSTANT", 1); List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesBeforeOrOn( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid"))); + getBasePathParam(ctx), + getPartitionParam(ctx), + getMaxInstantParamMandatory(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT", 1); Map<String, List<BaseFileDTO>> dtos = dataFileHandler.getAllLatestDataFilesBeforeOrOn( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid"))); + getBasePathParam(ctx), + getMaxInstantParamMandatory(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_DATA_FILE_ON_INSTANT", 1); List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFileOn( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), + getBasePathParam(ctx), + getPartitionParam(ctx), ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANT_PARAM, String.class).get(), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, String.class).getOrThrow(e -> new HoodieException("FILEID is invalid"))); + getFileIdParam(ctx)); writeValueAsString(ctx, dtos); }, true)); - app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.ALL_DATA_FILES_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_DATA_FILES", 1); List<BaseFileDTO> dtos = dataFileHandler.getAllDataFiles( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_DATA_FILES_RANGE_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_DATA_FILES_RANGE_INSTANT", 1); List<BaseFileDTO> dtos = dataFileHandler.getLatestDataFilesInRange( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM, String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is invalid")).split(","))); + getBasePathParam(ctx), + getInstantsParam(ctx)); writeValueAsString(ctx, dtos); }, true)); } @@ -326,121 +306,116 @@ public class RequestHandler { app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_SLICES", 1); List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlices( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICES_STATELESS_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_SLICES_STATELESS", 1); List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesStateless( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_SLICE_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_SLICE", 1); List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlice( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.FILEID_PARAM, String.class).getOrThrow(e -> new HoodieException("FILEID is invalid"))); + getBasePathParam(ctx), + getPartitionParam(ctx), + getFileIdParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_PARTITION_UNCOMPACTED_SLICES_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_PARTITION_UNCOMPACTED_SLICES", 1); List<FileSliceDTO> dtos = sliceHandler.getLatestUnCompactedFileSlices( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_SLICES_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_SLICES", 1); List<FileSliceDTO> dtos = sliceHandler.getAllFileSlices( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_RANGE_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_SLICE_RANGE_INSTANT", 1); List<FileSliceDTO> dtos = sliceHandler.getLatestFileSliceInRange( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - Arrays.asList(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INSTANTS_PARAM, String.class).getOrThrow(e -> new HoodieException("INSTANTS_PARAM is invalid")).split(","))); + getBasePathParam(ctx), + getInstantsParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_SLICES_MERGED_BEFORE_ON_INSTANT", 1); List<FileSliceDTO> dtos = sliceHandler.getLatestMergedFileSlicesBeforeOrOn( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid"))); + getBasePathParam(ctx), + getPartitionParam(ctx), + getMaxInstantParamMandatory(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.LATEST_SLICES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("LATEST_SLICES_BEFORE_ON_INSTANT", 1); List<FileSliceDTO> dtos = sliceHandler.getLatestFileSlicesBeforeOrOn( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid")), - Boolean.parseBoolean( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM, String.class) - .getOrThrow(e -> new HoodieException("INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM is invalid")))); + getBasePathParam(ctx), + getPartitionParam(ctx), + getMaxInstantParamMandatory(ctx), + getIncludeFilesInPendingCompactionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_LATEST_SLICES_BEFORE_ON_INSTANT", 1); Map<String, List<FileSliceDTO>> dtos = sliceHandler.getAllLatestFileSlicesBeforeOrOn( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrThrow(e -> new HoodieException("MAX_INSTANT_PARAM is invalid"))); + getBasePathParam(ctx), + getMaxInstantParamMandatory(ctx)); writeValueAsString(ctx, dtos); }, true)); - app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.PENDING_COMPACTION_OPS_URL, new ViewHandler(ctx -> { metricsRegistry.add("PEDING_COMPACTION_OPS", 1); - List<CompactionOpDTO> dtos = sliceHandler.getPendingCompactionOperations( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); + List<CompactionOpDTO> dtos = sliceHandler.getPendingCompactionOperations(getBasePathParam(ctx)); writeValueAsString(ctx, dtos); }, true)); - app.get(RemoteHoodieTableFileSystemView.PENDING_LOG_COMPACTION_OPS, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.PENDING_LOG_COMPACTION_OPS_URL, new ViewHandler(ctx -> { metricsRegistry.add("PEDING_LOG_COMPACTION_OPS", 1); - List<CompactionOpDTO> dtos = sliceHandler.getPendingLogCompactionOperations( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); + List<CompactionOpDTO> dtos = sliceHandler.getPendingLogCompactionOperations(getBasePathParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION", 1); List<FileGroupDTO> dtos = sliceHandler.getAllFileGroups( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); app.get(RemoteHoodieTableFileSystemView.ALL_FILEGROUPS_FOR_PARTITION_STATELESS_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_FILEGROUPS_FOR_PARTITION_STATELESS", 1); List<FileGroupDTO> dtos = sliceHandler.getAllFileGroupsStateless( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); - app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE, new ViewHandler(ctx -> { + app.post(RemoteHoodieTableFileSystemView.REFRESH_TABLE_URL, new ViewHandler(ctx -> { metricsRegistry.add("REFRESH_TABLE", 1); - boolean success = sliceHandler - .refreshTable(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); + boolean success = sliceHandler.refreshTable(getBasePathParam(ctx)); writeValueAsString(ctx, success); }, false)); app.post(RemoteHoodieTableFileSystemView.LOAD_PARTITIONS_URL, new ViewHandler(ctx -> { metricsRegistry.add("LOAD_PARTITIONS", 1); - String basePath = ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")); + String basePath = getBasePathParam(ctx); try { List<String> partitionPaths = OBJECT_MAPPER.readValue(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITIONS_PARAM, String.class) .getOrThrow(e -> new HoodieException("Partitions param is invalid")), LIST_TYPE_REFERENCE); @@ -453,50 +428,48 @@ public class RequestHandler { app.post(RemoteHoodieTableFileSystemView.LOAD_ALL_PARTITIONS_URL, new ViewHandler(ctx -> { metricsRegistry.add("LOAD_ALL_PARTITIONS", 1); - boolean success = sliceHandler - .loadAllPartitions(ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); + boolean success = sliceHandler.loadAllPartitions(getBasePathParam(ctx)); writeValueAsString(ctx, success); }, false)); - app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON", 1); List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBeforeOrOn( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getMaxInstantParamOptional(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); - app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_BEFORE_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_REPLACED_FILEGROUPS_BEFORE", 1); List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsBefore( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MAX_INSTANT_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getMaxInstantParamOptional(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); - app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_AFTER_OR_ON, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_AFTER_OR_ON_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_REPLACED_FILEGROUPS_AFTER_OR_ON", 1); List<FileGroupDTO> dtos = sliceHandler.getReplacedFileGroupsAfterOrOn( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.MIN_INSTANT_PARAM, String.class).getOrDefault(""), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getMinInstantParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); - app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.ALL_REPLACED_FILEGROUPS_PARTITION_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_REPLACED_FILEGROUPS_PARTITION", 1); List<FileGroupDTO> dtos = sliceHandler.getAllReplacedFileGroups( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid")), - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.PARTITION_PARAM, String.class).getOrDefault("")); + getBasePathParam(ctx), + getPartitionParam(ctx)); writeValueAsString(ctx, dtos); }, true)); - app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS, new ViewHandler(ctx -> { + app.get(RemoteHoodieTableFileSystemView.PENDING_CLUSTERING_FILEGROUPS_URL, new ViewHandler(ctx -> { metricsRegistry.add("PENDING_CLUSTERING_FILEGROUPS", 1); - List<ClusteringOpDTO> dtos = sliceHandler.getFileGroupsInPendingClustering( - ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.BASEPATH_PARAM, String.class).getOrThrow(e -> new HoodieException("Basepath is invalid"))); + List<ClusteringOpDTO> dtos = sliceHandler.getFileGroupsInPendingClustering(getBasePathParam(ctx)); writeValueAsString(ctx, dtos); }, true)); } @@ -504,15 +477,13 @@ public class RequestHandler { private void registerMarkerAPI() { app.get(MarkerOperation.ALL_MARKERS_URL, new ViewHandler(ctx -> { metricsRegistry.add("ALL_MARKERS", 1); - Set<String> markers = markerHandler.getAllMarkers( - ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault("")); + Set<String> markers = markerHandler.getAllMarkers(getMarkerDirParam(ctx)); writeValueAsString(ctx, markers); }, false)); app.get(MarkerOperation.CREATE_AND_MERGE_MARKERS_URL, new ViewHandler(ctx -> { metricsRegistry.add("CREATE_AND_MERGE_MARKERS", 1); - Set<String> markers = markerHandler.getCreateAndMergeMarkers( - ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault("")); + Set<String> markers = markerHandler.getCreateAndMergeMarkers(getMarkerDirParam(ctx)); writeValueAsString(ctx, markers); }, false)); @@ -525,8 +496,7 @@ public class RequestHandler { app.get(MarkerOperation.MARKERS_DIR_EXISTS_URL, new ViewHandler(ctx -> { metricsRegistry.add("MARKERS_DIR_EXISTS", 1); - boolean exist = markerHandler.doesMarkerDirExist( - ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault("")); + boolean exist = markerHandler.doesMarkerDirExist(getMarkerDirParam(ctx)); writeValueAsString(ctx, exist); }, false)); @@ -534,30 +504,18 @@ public class RequestHandler { metricsRegistry.add("CREATE_MARKER", 1); ctx.future(markerHandler.createMarker( ctx, - ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault(""), + getMarkerDirParam(ctx), ctx.queryParamAsClass(MarkerOperation.MARKER_NAME_PARAM, String.class).getOrDefault(""), ctx.queryParamAsClass(MarkerOperation.MARKER_BASEPATH_PARAM, String.class).getOrDefault(""))); }, false)); app.post(MarkerOperation.DELETE_MARKER_DIR_URL, new ViewHandler(ctx -> { metricsRegistry.add("DELETE_MARKER_DIR", 1); - boolean success = markerHandler.deleteMarkers( - ctx.queryParamAsClass(MarkerOperation.MARKER_DIR_PATH_PARAM, String.class).getOrDefault("")); + boolean success = markerHandler.deleteMarkers(getMarkerDirParam(ctx)); writeValueAsString(ctx, success); }, false)); } - /** - * Determine whether to throw an exception when local view of table's timeline is behind that of client's view. - */ - private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline localTimeline, String timelineHashFromClient) { - Option<HoodieInstant> lastInstant = localTimeline.lastInstant(); - // When performing async clean, we may have one more .clean.completed after lastInstantTs. - // In this case, we do not need to throw an exception. - return !lastInstant.isPresent() || !lastInstant.get().getAction().equals(HoodieTimeline.CLEAN_ACTION) - || !localTimeline.findInstantsBefore(lastInstant.get().getTimestamp()).getTimelineHash().equals(timelineHashFromClient); - } - /** * Used for logging and performing refresh check. */ @@ -604,16 +562,13 @@ public class RequestHandler { if (refreshCheck) { long beginFinalCheck = System.currentTimeMillis(); if (isLocalViewBehind(context)) { - String lastKnownInstantFromClient = context.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS); - String timelineHashFromClient = context.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, String.class).getOrDefault(""); + String lastKnownInstantFromClient = getLastInstantTsParam(context); + String timelineHashFromClient = getTimelineHashParam(context); HoodieTimeline localTimeline = viewManager.getFileSystemView(context.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM)).getTimeline(); if (shouldThrowExceptionIfLocalViewBehind(localTimeline, timelineHashFromClient)) { - String errMsg = - "Last known instant from client was " - + lastKnownInstantFromClient - + " but server has the following timeline " - + localTimeline.getInstants(); + String errMsg = String.format("Last known instant from client was %s but server has the following timeline %s", + lastKnownInstantFromClient, localTimeline.getInstants()); throw new BadRequestResponse(errMsg); } } @@ -623,9 +578,9 @@ public class RequestHandler { } catch (RuntimeException re) { success = false; if (re instanceof BadRequestResponse) { - LOG.warn("Bad request response due to client view behind server view. " + re.getMessage()); + LOG.warn("Bad request response due to client view behind server view. {}", re.getMessage()); } else { - LOG.error("Got runtime exception servicing request " + context.queryString(), re); + LOG.error(String.format("Got runtime exception servicing request %s", context.queryString()), re); } throw re; } finally { @@ -637,14 +592,85 @@ public class RequestHandler { metricsRegistry.add("TOTAL_CHECK_TIME", finalCheckTimeTaken); metricsRegistry.add("TOTAL_API_CALLS", 1); - LOG.debug(String.format( - "TimeTakenMillis[Total=%d, Refresh=%d, handle=%d, Check=%d], " - + "Success=%s, Query=%s, Host=%s, synced=%s", - timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, - context.queryString(), context.host(), synced)); + if (LOG.isDebugEnabled()) { + LOG.debug("TimeTakenMillis[Total={}, Refresh={}, handle={}, Check={}], Success={}, Query={}, Host={}, synced={}", + timeTakenMillis, refreshCheckTimeTaken, handleTimeTaken, finalCheckTimeTaken, success, context.queryString(), context.host(), synced); + } } return null; }); } + + /** + * Determines if local view of table's timeline is behind that of client's view. + */ + private boolean isLocalViewBehind(Context ctx) { + String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM); + String lastKnownInstantFromClient = getLastInstantTsParam(ctx); + String timelineHashFromClient = getTimelineHashParam(ctx); + HoodieTimeline localTimeline = + viewManager.getFileSystemView(basePath).getTimeline().filterCompletedOrMajorOrMinorCompactionInstants(); + if (LOG.isDebugEnabled()) { + LOG.debug("Client [ LastTs={}, TimelineHash={}], localTimeline={}",lastKnownInstantFromClient, timelineHashFromClient, localTimeline.getInstants()); + } + + if ((!localTimeline.getInstantsAsStream().findAny().isPresent()) + && HoodieTimeline.INVALID_INSTANT_TS.equals(lastKnownInstantFromClient)) { + return false; + } + + String localTimelineHash = localTimeline.getTimelineHash(); + // refresh if timeline hash mismatches + if (!localTimelineHash.equals(timelineHashFromClient)) { + return true; + } + + // As a safety check, even if hash is same, ensure instant is present + return !localTimeline.containsOrBeforeTimelineStarts(lastKnownInstantFromClient); + } + + /** + * Syncs data-set view if local view is behind. + */ + private boolean syncIfLocalViewBehind(Context ctx) { + String basePath = ctx.queryParam(RemoteHoodieTableFileSystemView.BASEPATH_PARAM); + SyncableFileSystemView view = viewManager.getFileSystemView(basePath); + synchronized (view) { + if (isLocalViewBehind(ctx)) { + String lastKnownInstantFromClient = getLastInstantTsParam(ctx); + HoodieTimeline localTimeline = viewManager.getFileSystemView(basePath).getTimeline(); + if (LOG.isInfoEnabled()) { + LOG.info("Syncing view as client passed last known instant {} as last known instant but server has the following last instant on timeline: {}", + lastKnownInstantFromClient, localTimeline.lastInstant()); + } + view.sync(); + return true; + } + } + return false; + } + + /** + * Determine whether to throw an exception when local view of table's timeline is behind that of client's view. + */ + private boolean shouldThrowExceptionIfLocalViewBehind(HoodieTimeline localTimeline, String timelineHashFromClient) { + Option<HoodieInstant> lastInstant = localTimeline.lastInstant(); + // When performing async clean, we may have one more .clean.completed after lastInstantTs. + // In this case, we do not need to throw an exception. + return !lastInstant.isPresent() || !lastInstant.get().getAction().equals(HoodieTimeline.CLEAN_ACTION) + || !localTimeline.findInstantsBefore(lastInstant.get().getTimestamp()).getTimelineHash().equals(timelineHashFromClient); + } + + private boolean isRefreshCheckDisabledInQuery(Context ctx) { + return Boolean.parseBoolean(ctx.queryParam(RemoteHoodieTableFileSystemView.REFRESH_OFF)); + } + + private String getLastInstantTsParam(Context ctx) { + return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.LAST_INSTANT_TS, String.class).getOrDefault(HoodieTimeline.INVALID_INSTANT_TS); + } + + private String getTimelineHashParam(Context ctx) { + return ctx.queryParamAsClass(RemoteHoodieTableFileSystemView.TIMELINE_HASH, String.class).getOrDefault(""); + } } }
