alexeykudinkin commented on a change in pull request #3986:
URL: https://github.com/apache/hudi/pull/3986#discussion_r749626727



##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile 
profile, HoodieSparkEng
 
   @Override
   protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-    // smallFiles only for partitionPath
-    List<SmallFile> smallFileLocations = new ArrayList<>();
-
     // Init here since this class (and member variables) might not have been 
initialized
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    // Find out all eligible small file slices
-    if (!commitTimeline.empty()) {
-      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // find smallest file in partition and append to it
-      List<FileSlice> allSmallFileSlices = new ArrayList<>();
-      // If we cannot index log files, then we choose the smallest parquet 
file in the partition and add inserts to
-      // it. Doing this overtime for a partition, we ensure that we handle 
small file issues
-      if (!table.getIndex().canIndexLogFiles()) {
-        // TODO : choose last N small files since there can be multiple small 
files written to a single partition
-        // by different spark partitions in a single batch
-        Option<FileSlice> smallFileSlice = 
Option.fromJavaOptional(table.getSliceView()
-            .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-            .filter(
-                fileSlice -> fileSlice.getLogFiles().count() < 1 && 
fileSlice.getBaseFile().get().getFileSize() < config
-                    .getParquetSmallFileLimit())
-            .min((FileSlice left, FileSlice right) ->
-                left.getBaseFile().get().getFileSize() < 
right.getBaseFile().get().getFileSize() ? -1 : 1));
-        if (smallFileSlice.isPresent()) {
-          allSmallFileSlices.add(smallFileSlice.get());
-        }
+    if (commitTimeline.empty()) {
+      return Collections.emptyList();
+    }
+
+    HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+    // Find out all eligible small file slices, looking for
+    // smallest file in the partition to append to
+    List<FileSlice> smallFileSlicesCandidates = 
pickSmallFileCandidates(partitionPath, latestCommitTime);
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Create SmallFiles from the eligible file slices
+    for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+      SmallFile sf = new SmallFile();
+      if (smallFileSlice.getBaseFile().isPresent()) {
+        // TODO : Move logic of file name, file id, base commit time handling 
inside file slice
+        String filename = smallFileSlice.getBaseFile().get().getFileName();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       } else {
-        // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
-        // pending compaction
-        List<FileSlice> allFileSlices =
-            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-                .collect(Collectors.toList());
-        for (FileSlice fileSlice : allFileSlices) {
-          if (isSmallFile(fileSlice)) {
-            allSmallFileSlices.add(fileSlice);
-          }
-        }
-      }
-      // Create SmallFiles from the eligible file slices
-      for (FileSlice smallFileSlice : allSmallFileSlices) {
-        SmallFile sf = new SmallFile();
-        if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time 
handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        } else {
-          HoodieLogFile logFile = 
smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        }
+        HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       }
     }
     return smallFileLocations;
   }
 
+  @Nonnull
+  private List<FileSlice> pickSmallFileCandidates(String partitionPath, 
HoodieInstant latestCommitInstant) {
+    // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
+    // pending compaction
+    if (table.getIndex().canIndexLogFiles()) {
+      return table.getSliceView()
+              .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitInstant.getTimestamp(), false)
+              .filter(this::isSmallFile)
+              .collect(Collectors.toList());
+    }
+
+    // If we cannot index log files, then we choose the smallest parquet file 
in the partition and add inserts to
+    // it. Doing this overtime for a partition, we ensure that we handle small 
file issues
+    // TODO : choose last N small files since there can be multiple small 
files written to a single partition

Review comment:
       Good catch. Forgot to clean up

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile 
profile, HoodieSparkEng
 
   @Override
   protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-    // smallFiles only for partitionPath
-    List<SmallFile> smallFileLocations = new ArrayList<>();
-
     // Init here since this class (and member variables) might not have been 
initialized
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    // Find out all eligible small file slices
-    if (!commitTimeline.empty()) {
-      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // find smallest file in partition and append to it
-      List<FileSlice> allSmallFileSlices = new ArrayList<>();
-      // If we cannot index log files, then we choose the smallest parquet 
file in the partition and add inserts to
-      // it. Doing this overtime for a partition, we ensure that we handle 
small file issues
-      if (!table.getIndex().canIndexLogFiles()) {
-        // TODO : choose last N small files since there can be multiple small 
files written to a single partition
-        // by different spark partitions in a single batch
-        Option<FileSlice> smallFileSlice = 
Option.fromJavaOptional(table.getSliceView()
-            .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-            .filter(
-                fileSlice -> fileSlice.getLogFiles().count() < 1 && 
fileSlice.getBaseFile().get().getFileSize() < config
-                    .getParquetSmallFileLimit())
-            .min((FileSlice left, FileSlice right) ->
-                left.getBaseFile().get().getFileSize() < 
right.getBaseFile().get().getFileSize() ? -1 : 1));
-        if (smallFileSlice.isPresent()) {
-          allSmallFileSlices.add(smallFileSlice.get());
-        }
+    if (commitTimeline.empty()) {
+      return Collections.emptyList();
+    }
+
+    HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+    // Find out all eligible small file slices, looking for
+    // smallest file in the partition to append to
+    List<FileSlice> smallFileSlicesCandidates = 
pickSmallFileCandidates(partitionPath, latestCommitTime);
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Create SmallFiles from the eligible file slices
+    for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+      SmallFile sf = new SmallFile();
+      if (smallFileSlice.getBaseFile().isPresent()) {
+        // TODO : Move logic of file name, file id, base commit time handling 
inside file slice
+        String filename = smallFileSlice.getBaseFile().get().getFileName();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       } else {
-        // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
-        // pending compaction
-        List<FileSlice> allFileSlices =
-            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-                .collect(Collectors.toList());
-        for (FileSlice fileSlice : allFileSlices) {
-          if (isSmallFile(fileSlice)) {
-            allSmallFileSlices.add(fileSlice);
-          }
-        }
-      }
-      // Create SmallFiles from the eligible file slices
-      for (FileSlice smallFileSlice : allSmallFileSlices) {
-        SmallFile sf = new SmallFile();
-        if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time 
handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        } else {
-          HoodieLogFile logFile = 
smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        }
+        HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       }
     }
     return smallFileLocations;
   }
 
+  @Nonnull
+  private List<FileSlice> pickSmallFileCandidates(String partitionPath, 
HoodieInstant latestCommitInstant) {
+    // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
+    // pending compaction
+    if (table.getIndex().canIndexLogFiles()) {
+      return table.getSliceView()
+              .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitInstant.getTimestamp(), false)
+              .filter(this::isSmallFile)
+              .collect(Collectors.toList());
+    }
+

Review comment:
       I'm actually deliberately inverting those conditionals to drop some of 
the else blocks to reduce nesting, by facilitating early returns

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile 
profile, HoodieSparkEng
 
   @Override
   protected List<SmallFile> getSmallFiles(String partitionPath) {
-
-    // smallFiles only for partitionPath
-    List<SmallFile> smallFileLocations = new ArrayList<>();
-
     // Init here since this class (and member variables) might not have been 
initialized
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
 
-    // Find out all eligible small file slices
-    if (!commitTimeline.empty()) {
-      HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
-      // find smallest file in partition and append to it
-      List<FileSlice> allSmallFileSlices = new ArrayList<>();
-      // If we cannot index log files, then we choose the smallest parquet 
file in the partition and add inserts to
-      // it. Doing this overtime for a partition, we ensure that we handle 
small file issues
-      if (!table.getIndex().canIndexLogFiles()) {
-        // TODO : choose last N small files since there can be multiple small 
files written to a single partition
-        // by different spark partitions in a single batch
-        Option<FileSlice> smallFileSlice = 
Option.fromJavaOptional(table.getSliceView()
-            .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-            .filter(
-                fileSlice -> fileSlice.getLogFiles().count() < 1 && 
fileSlice.getBaseFile().get().getFileSize() < config
-                    .getParquetSmallFileLimit())
-            .min((FileSlice left, FileSlice right) ->
-                left.getBaseFile().get().getFileSize() < 
right.getBaseFile().get().getFileSize() ? -1 : 1));
-        if (smallFileSlice.isPresent()) {
-          allSmallFileSlices.add(smallFileSlice.get());
-        }
+    if (commitTimeline.empty()) {
+      return Collections.emptyList();
+    }
+
+    HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+    // Find out all eligible small file slices, looking for
+    // smallest file in the partition to append to
+    List<FileSlice> smallFileSlicesCandidates = 
pickSmallFileCandidates(partitionPath, latestCommitTime);
+    List<SmallFile> smallFileLocations = new ArrayList<>();
+
+    // Create SmallFiles from the eligible file slices
+    for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+      SmallFile sf = new SmallFile();
+      if (smallFileSlice.getBaseFile().isPresent()) {
+        // TODO : Move logic of file name, file id, base commit time handling 
inside file slice
+        String filename = smallFileSlice.getBaseFile().get().getFileName();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       } else {
-        // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
-        // pending compaction
-        List<FileSlice> allFileSlices =
-            table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitTime.getTimestamp(), false)
-                .collect(Collectors.toList());
-        for (FileSlice fileSlice : allFileSlices) {
-          if (isSmallFile(fileSlice)) {
-            allSmallFileSlices.add(fileSlice);
-          }
-        }
-      }
-      // Create SmallFiles from the eligible file slices
-      for (FileSlice smallFileSlice : allSmallFileSlices) {
-        SmallFile sf = new SmallFile();
-        if (smallFileSlice.getBaseFile().isPresent()) {
-          // TODO : Move logic of file name, file id, base commit time 
handling inside file slice
-          String filename = smallFileSlice.getBaseFile().get().getFileName();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getCommitTime(filename), 
FSUtils.getFileId(filename));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        } else {
-          HoodieLogFile logFile = 
smallFileSlice.getLogFiles().findFirst().get();
-          sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
-              FSUtils.getFileIdFromLogPath(logFile.getPath()));
-          sf.sizeBytes = getTotalFileSize(smallFileSlice);
-          smallFileLocations.add(sf);
-        }
+        HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+        sf.location = new 
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+            FSUtils.getFileIdFromLogPath(logFile.getPath()));
+        sf.sizeBytes = getTotalFileSize(smallFileSlice);
+        smallFileLocations.add(sf);
       }
     }
     return smallFileLocations;
   }
 
+  @Nonnull
+  private List<FileSlice> pickSmallFileCandidates(String partitionPath, 
HoodieInstant latestCommitInstant) {
+    // If we can index log files, we can add more inserts to log files for 
fileIds NOT including those under
+    // pending compaction
+    if (table.getIndex().canIndexLogFiles()) {
+      return table.getSliceView()
+              .getLatestFileSlicesBeforeOrOn(partitionPath, 
latestCommitInstant.getTimestamp(), false)
+              .filter(this::isSmallFile)
+              .collect(Collectors.toList());
+    }
+

Review comment:
       Here's [good 
example](https://softwareengineering.stackexchange.com/a/18473/326225) of how 
early returns streamline control flow




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to