vinothchandar commented on a change in pull request #1009: [HUDI-308] Avoid
Renames for tracking state transitions of all actions on dataset
URL: https://github.com/apache/incubator-hudi/pull/1009#discussion_r348079867
##########
File path:
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -420,17 +427,50 @@ public String getCommitActionType() {
* @return List of Hoodie Instants generated
* @throws IOException in case of failure
*/
- public static List<HoodieInstant>
scanHoodieInstantsFromFileSystem(FileSystem fs, Path metaPath,
- Set<String> includedExtensions) throws IOException {
- return Arrays.stream(HoodieTableMetaClient.scanFiles(fs, metaPath, path ->
{
- // Include only the meta files with extensions that needs to be included
- String extension = FSUtils.getFileExtension(path.getName());
- return includedExtensions.contains(extension);
- })).sorted(Comparator.comparing(
- // Sort the meta-data by the instant time (first part of the file name)
- fileStatus -> FSUtils.getInstantTime(fileStatus.getPath().getName())))
- // create HoodieInstantMarkers from FileStatus, which extracts
properties
- .map(HoodieInstant::new).collect(Collectors.toList());
+ public static List<HoodieInstant> scanHoodieInstantsFromFileSystem(
+ FileSystem fs, Path metaPath, Set<String> includedExtensions) throws
IOException {
+ return scanHoodieInstantsFromFileSystem(fs, metaPath, includedExtensions,
true);
+ }
+
+ /**
+ * Helper method to scan all hoodie-instant metafiles and construct
HoodieInstant objects
+ *
+ * @param fs FileSystem
+ * @param metaPath Meta Path where hoodie instants are present
+ * @param includedExtensions Included hoodie extensions
+ * @param excludeIntermediateStates If there are multiple states for the
same action instant,
+ * only include the highest state
+ * @return List of Hoodie Instants generated
+ * @throws IOException in case of failure
+ */
+ public static List<HoodieInstant> scanHoodieInstantsFromFileSystem(
+ FileSystem fs, Path metaPath, Set<String> includedExtensions, boolean
excludeIntermediateStates)
+ throws IOException {
+ Stream<HoodieInstant> instantStream = Arrays.stream(
+ HoodieTableMetaClient
+ .scanFiles(fs, metaPath, path -> {
+ // Include only the meta files with extensions that needs to be
included
+ String extension = FSUtils.getFileExtension(path.getName());
+ return includedExtensions.contains(extension);
+ })).map(HoodieInstant::new);
+
+ if (excludeIntermediateStates) {
+ // Remove intermediate states for each (ts,action) pair
+ instantStream = dedupeInstants(instantStream);
+ }
+ return instantStream.sorted().collect(Collectors.toList());
+ }
+
+ public static Stream<HoodieInstant> dedupeInstants(Stream<HoodieInstant>
instantStream) {
+ return instantStream.collect(Collectors.groupingBy(x ->
Pair.of(x.getTimestamp(),
+ x.getAction().equals(HoodieTimeline.COMPACTION_ACTION) ?
HoodieTimeline.COMMIT_ACTION : x.getAction())))
+ .entrySet().stream().map(e -> e.getValue().stream().reduce((x, y) -> {
Review comment:
+1 on different names for x and y as well :) more descriptive pls
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services