alexeykudinkin commented on a change in pull request #3986:
URL: https://github.com/apache/hudi/pull/3986#discussion_r749626727
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile
profile, HoodieSparkEng
@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {
-
- // smallFiles only for partitionPath
- List<SmallFile> smallFileLocations = new ArrayList<>();
-
// Init here since this class (and member variables) might not have been
initialized
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
- // Find out all eligible small file slices
- if (!commitTimeline.empty()) {
- HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
- // find smallest file in partition and append to it
- List<FileSlice> allSmallFileSlices = new ArrayList<>();
- // If we cannot index log files, then we choose the smallest parquet
file in the partition and add inserts to
- // it. Doing this overtime for a partition, we ensure that we handle
small file issues
- if (!table.getIndex().canIndexLogFiles()) {
- // TODO : choose last N small files since there can be multiple small
files written to a single partition
- // by different spark partitions in a single batch
- Option<FileSlice> smallFileSlice =
Option.fromJavaOptional(table.getSliceView()
- .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp(), false)
- .filter(
- fileSlice -> fileSlice.getLogFiles().count() < 1 &&
fileSlice.getBaseFile().get().getFileSize() < config
- .getParquetSmallFileLimit())
- .min((FileSlice left, FileSlice right) ->
- left.getBaseFile().get().getFileSize() <
right.getBaseFile().get().getFileSize() ? -1 : 1));
- if (smallFileSlice.isPresent()) {
- allSmallFileSlices.add(smallFileSlice.get());
- }
+ if (commitTimeline.empty()) {
+ return Collections.emptyList();
+ }
+
+ HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+ // Find out all eligible small file slices, looking for
+ // smallest file in the partition to append to
+ List<FileSlice> smallFileSlicesCandidates =
pickSmallFileCandidates(partitionPath, latestCommitTime);
+ List<SmallFile> smallFileLocations = new ArrayList<>();
+
+ // Create SmallFiles from the eligible file slices
+ for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+ SmallFile sf = new SmallFile();
+ if (smallFileSlice.getBaseFile().isPresent()) {
+ // TODO : Move logic of file name, file id, base commit time handling
inside file slice
+ String filename = smallFileSlice.getBaseFile().get().getFileName();
+ sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
+ sf.sizeBytes = getTotalFileSize(smallFileSlice);
+ smallFileLocations.add(sf);
} else {
- // If we can index log files, we can add more inserts to log files for
fileIds NOT including those under
- // pending compaction
- List<FileSlice> allFileSlices =
- table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp(), false)
- .collect(Collectors.toList());
- for (FileSlice fileSlice : allFileSlices) {
- if (isSmallFile(fileSlice)) {
- allSmallFileSlices.add(fileSlice);
- }
- }
- }
- // Create SmallFiles from the eligible file slices
- for (FileSlice smallFileSlice : allSmallFileSlices) {
- SmallFile sf = new SmallFile();
- if (smallFileSlice.getBaseFile().isPresent()) {
- // TODO : Move logic of file name, file id, base commit time
handling inside file slice
- String filename = smallFileSlice.getBaseFile().get().getFileName();
- sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
- sf.sizeBytes = getTotalFileSize(smallFileSlice);
- smallFileLocations.add(sf);
- } else {
- HoodieLogFile logFile =
smallFileSlice.getLogFiles().findFirst().get();
- sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
- FSUtils.getFileIdFromLogPath(logFile.getPath()));
- sf.sizeBytes = getTotalFileSize(smallFileSlice);
- smallFileLocations.add(sf);
- }
+ HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+ sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+ FSUtils.getFileIdFromLogPath(logFile.getPath()));
+ sf.sizeBytes = getTotalFileSize(smallFileSlice);
+ smallFileLocations.add(sf);
}
}
return smallFileLocations;
}
+ @Nonnull
+ private List<FileSlice> pickSmallFileCandidates(String partitionPath,
HoodieInstant latestCommitInstant) {
+ // If we can index log files, we can add more inserts to log files for
fileIds NOT including those under
+ // pending compaction
+ if (table.getIndex().canIndexLogFiles()) {
+ return table.getSliceView()
+ .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitInstant.getTimestamp(), false)
+ .filter(this::isSmallFile)
+ .collect(Collectors.toList());
+ }
+
+ // If we cannot index log files, then we choose the smallest parquet file
in the partition and add inserts to
+ // it. Doing this overtime for a partition, we ensure that we handle small
file issues
+ // TODO : choose last N small files since there can be multiple small
files written to a single partition
Review comment:
Good catch. Forgot to clean up
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile
profile, HoodieSparkEng
@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {
-
- // smallFiles only for partitionPath
- List<SmallFile> smallFileLocations = new ArrayList<>();
-
// Init here since this class (and member variables) might not have been
initialized
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
- // Find out all eligible small file slices
- if (!commitTimeline.empty()) {
- HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
- // find smallest file in partition and append to it
- List<FileSlice> allSmallFileSlices = new ArrayList<>();
- // If we cannot index log files, then we choose the smallest parquet
file in the partition and add inserts to
- // it. Doing this overtime for a partition, we ensure that we handle
small file issues
- if (!table.getIndex().canIndexLogFiles()) {
- // TODO : choose last N small files since there can be multiple small
files written to a single partition
- // by different spark partitions in a single batch
- Option<FileSlice> smallFileSlice =
Option.fromJavaOptional(table.getSliceView()
- .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp(), false)
- .filter(
- fileSlice -> fileSlice.getLogFiles().count() < 1 &&
fileSlice.getBaseFile().get().getFileSize() < config
- .getParquetSmallFileLimit())
- .min((FileSlice left, FileSlice right) ->
- left.getBaseFile().get().getFileSize() <
right.getBaseFile().get().getFileSize() ? -1 : 1));
- if (smallFileSlice.isPresent()) {
- allSmallFileSlices.add(smallFileSlice.get());
- }
+ if (commitTimeline.empty()) {
+ return Collections.emptyList();
+ }
+
+ HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+ // Find out all eligible small file slices, looking for
+ // smallest file in the partition to append to
+ List<FileSlice> smallFileSlicesCandidates =
pickSmallFileCandidates(partitionPath, latestCommitTime);
+ List<SmallFile> smallFileLocations = new ArrayList<>();
+
+ // Create SmallFiles from the eligible file slices
+ for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+ SmallFile sf = new SmallFile();
+ if (smallFileSlice.getBaseFile().isPresent()) {
+ // TODO : Move logic of file name, file id, base commit time handling
inside file slice
+ String filename = smallFileSlice.getBaseFile().get().getFileName();
+ sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
+ sf.sizeBytes = getTotalFileSize(smallFileSlice);
+ smallFileLocations.add(sf);
} else {
- // If we can index log files, we can add more inserts to log files for
fileIds NOT including those under
- // pending compaction
- List<FileSlice> allFileSlices =
- table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp(), false)
- .collect(Collectors.toList());
- for (FileSlice fileSlice : allFileSlices) {
- if (isSmallFile(fileSlice)) {
- allSmallFileSlices.add(fileSlice);
- }
- }
- }
- // Create SmallFiles from the eligible file slices
- for (FileSlice smallFileSlice : allSmallFileSlices) {
- SmallFile sf = new SmallFile();
- if (smallFileSlice.getBaseFile().isPresent()) {
- // TODO : Move logic of file name, file id, base commit time
handling inside file slice
- String filename = smallFileSlice.getBaseFile().get().getFileName();
- sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
- sf.sizeBytes = getTotalFileSize(smallFileSlice);
- smallFileLocations.add(sf);
- } else {
- HoodieLogFile logFile =
smallFileSlice.getLogFiles().findFirst().get();
- sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
- FSUtils.getFileIdFromLogPath(logFile.getPath()));
- sf.sizeBytes = getTotalFileSize(smallFileSlice);
- smallFileLocations.add(sf);
- }
+ HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+ sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+ FSUtils.getFileIdFromLogPath(logFile.getPath()));
+ sf.sizeBytes = getTotalFileSize(smallFileSlice);
+ smallFileLocations.add(sf);
}
}
return smallFileLocations;
}
+ @Nonnull
+ private List<FileSlice> pickSmallFileCandidates(String partitionPath,
HoodieInstant latestCommitInstant) {
+ // If we can index log files, we can add more inserts to log files for
fileIds NOT including those under
+ // pending compaction
+ if (table.getIndex().canIndexLogFiles()) {
+ return table.getSliceView()
+ .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitInstant.getTimestamp(), false)
+ .filter(this::isSmallFile)
+ .collect(Collectors.toList());
+ }
+
Review comment:
I'm actually deliberately inverting those conditionals to drop some of
the else blocks to reduce nesting, by facilitating early returns
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java
##########
@@ -51,68 +52,70 @@ public SparkUpsertDeltaCommitPartitioner(WorkloadProfile
profile, HoodieSparkEng
@Override
protected List<SmallFile> getSmallFiles(String partitionPath) {
-
- // smallFiles only for partitionPath
- List<SmallFile> smallFileLocations = new ArrayList<>();
-
// Init here since this class (and member variables) might not have been
initialized
HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
- // Find out all eligible small file slices
- if (!commitTimeline.empty()) {
- HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
- // find smallest file in partition and append to it
- List<FileSlice> allSmallFileSlices = new ArrayList<>();
- // If we cannot index log files, then we choose the smallest parquet
file in the partition and add inserts to
- // it. Doing this overtime for a partition, we ensure that we handle
small file issues
- if (!table.getIndex().canIndexLogFiles()) {
- // TODO : choose last N small files since there can be multiple small
files written to a single partition
- // by different spark partitions in a single batch
- Option<FileSlice> smallFileSlice =
Option.fromJavaOptional(table.getSliceView()
- .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp(), false)
- .filter(
- fileSlice -> fileSlice.getLogFiles().count() < 1 &&
fileSlice.getBaseFile().get().getFileSize() < config
- .getParquetSmallFileLimit())
- .min((FileSlice left, FileSlice right) ->
- left.getBaseFile().get().getFileSize() <
right.getBaseFile().get().getFileSize() ? -1 : 1));
- if (smallFileSlice.isPresent()) {
- allSmallFileSlices.add(smallFileSlice.get());
- }
+ if (commitTimeline.empty()) {
+ return Collections.emptyList();
+ }
+
+ HoodieInstant latestCommitTime = commitTimeline.lastInstant().get();
+
+ // Find out all eligible small file slices, looking for
+ // smallest file in the partition to append to
+ List<FileSlice> smallFileSlicesCandidates =
pickSmallFileCandidates(partitionPath, latestCommitTime);
+ List<SmallFile> smallFileLocations = new ArrayList<>();
+
+ // Create SmallFiles from the eligible file slices
+ for (FileSlice smallFileSlice : smallFileSlicesCandidates) {
+ SmallFile sf = new SmallFile();
+ if (smallFileSlice.getBaseFile().isPresent()) {
+ // TODO : Move logic of file name, file id, base commit time handling
inside file slice
+ String filename = smallFileSlice.getBaseFile().get().getFileName();
+ sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
+ sf.sizeBytes = getTotalFileSize(smallFileSlice);
+ smallFileLocations.add(sf);
} else {
- // If we can index log files, we can add more inserts to log files for
fileIds NOT including those under
- // pending compaction
- List<FileSlice> allFileSlices =
- table.getSliceView().getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitTime.getTimestamp(), false)
- .collect(Collectors.toList());
- for (FileSlice fileSlice : allFileSlices) {
- if (isSmallFile(fileSlice)) {
- allSmallFileSlices.add(fileSlice);
- }
- }
- }
- // Create SmallFiles from the eligible file slices
- for (FileSlice smallFileSlice : allSmallFileSlices) {
- SmallFile sf = new SmallFile();
- if (smallFileSlice.getBaseFile().isPresent()) {
- // TODO : Move logic of file name, file id, base commit time
handling inside file slice
- String filename = smallFileSlice.getBaseFile().get().getFileName();
- sf.location = new
HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename));
- sf.sizeBytes = getTotalFileSize(smallFileSlice);
- smallFileLocations.add(sf);
- } else {
- HoodieLogFile logFile =
smallFileSlice.getLogFiles().findFirst().get();
- sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
- FSUtils.getFileIdFromLogPath(logFile.getPath()));
- sf.sizeBytes = getTotalFileSize(smallFileSlice);
- smallFileLocations.add(sf);
- }
+ HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get();
+ sf.location = new
HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()),
+ FSUtils.getFileIdFromLogPath(logFile.getPath()));
+ sf.sizeBytes = getTotalFileSize(smallFileSlice);
+ smallFileLocations.add(sf);
}
}
return smallFileLocations;
}
+ @Nonnull
+ private List<FileSlice> pickSmallFileCandidates(String partitionPath,
HoodieInstant latestCommitInstant) {
+ // If we can index log files, we can add more inserts to log files for
fileIds NOT including those under
+ // pending compaction
+ if (table.getIndex().canIndexLogFiles()) {
+ return table.getSliceView()
+ .getLatestFileSlicesBeforeOrOn(partitionPath,
latestCommitInstant.getTimestamp(), false)
+ .filter(this::isSmallFile)
+ .collect(Collectors.toList());
+ }
+
Review comment:
Here's [good
example](https://softwareengineering.stackexchange.com/a/18473/326225) of how
early returns streamline control flow
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]