alexeykudinkin commented on a change in pull request #4556:
URL: https://github.com/apache/hudi/pull/4556#discussion_r791356926
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,124 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
return returns.toArray(new FileStatus[0]);
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
Review comment:
Moved methods aren't actually changing in here
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
return returns.toArray(new FileStatus[0]);
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
+ List<FileStatus> diff = CollectionUtils.diff(targetFiles,
legacyFileStatuses);
+ checkState(diff.isEmpty(), "Should be empty");
+ }
+
+ @Nonnull
+ private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+ try {
+ return HoodieInputFormatUtils.getFileStatus(baseFile);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to get file-status", ioe);
+ }
+ }
+
+ /**
+ * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)}
operation to subclasses that
+ * lists files (returning an array of {@link FileStatus}) corresponding to
the input paths specified
+ * as part of provided {@link JobConf}
+ */
+ protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+ return super.listStatus(job);
+ }
+
+ /**
+ * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
+ * partitions and then filtering based on the commits of interest, this
logic first extracts the
+ * partitions touched by the desired commits and then lists only those
partitions.
+ */
+ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+
HoodieTableMetaClient tableMetaClient,
+ List<Path>
inputPaths,
+ String
incrementalTable) throws IOException {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ Job jobContext = Job.getInstance(job);
+ Option<HoodieTimeline> timeline =
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
+ return null;
+ }
+ Option<List<HoodieInstant>> commitsToCheck =
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName,
timeline.get());
+ if (!commitsToCheck.isPresent()) {
+ return null;
+ }
+ Option<String> incrementalInputPaths =
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(),
tableMetaClient, timeline.get(), inputPaths);
+ // Mutate the JobConf to set the input paths to only partitions touched by
incremental pull.
+ if (!incrementalInputPaths.isPresent()) {
+ return null;
+ }
+ setInputPaths(job, incrementalInputPaths.get());
+ FileStatus[] fileStatuses = doListStatus(job);
+ return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext,
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+ }
+
+ protected abstract boolean includeLogFilesForSnapshotView();
+
+ @Nonnull
+ private static RealtimeFileStatus
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
+
Stream<HoodieLogFile> logFiles,
+
Option<HoodieInstant> latestCompletedInstantOpt,
+
HoodieTableMetaClient tableMetaClient) {
+ List<HoodieLogFile> sortedLogFiles =
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+ FileStatus baseFileStatus = getFileStatusUnchecked(baseFile);
+ try {
+ RealtimeFileStatus rtFileStatus = new RealtimeFileStatus(baseFileStatus);
+ rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+ rtFileStatus.setBaseFilePath(baseFile.getPath());
+ rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+ if (latestCompletedInstantOpt.isPresent()) {
+ HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+ checkState(latestCompletedInstant.isCompleted());
+
+ rtFileStatus.setMaxCommitTime(latestCompletedInstant.getTimestamp());
+ }
+
+ if (baseFileStatus instanceof LocatedFileStatusWithBootstrapBaseFile ||
baseFileStatus instanceof FileStatusWithBootstrapBaseFile) {
+ rtFileStatus.setBootStrapFileStatus(baseFileStatus);
+ }
+
+ return rtFileStatus;
+ } catch (IOException e) {
+ throw new HoodieIOException(String.format("Failed to init %s",
RealtimeFileStatus.class.getSimpleName()), e);
+ }
+ }
+
+ @Nonnull
+ private List<FileStatus> listStatusForSnapshotModeLegacy(JobConf job,
Map<String, HoodieTableMetaClient> tableMetaClientMap, List<Path>
snapshotPaths) throws IOException {
+ return HoodieInputFormatUtils.filterFileStatusForSnapshotMode(job,
tableMetaClientMap, snapshotPaths, includeLogFilesForSnapshotView());
+ }
+
+ @Nonnull
+ private static RealtimeFileStatus
createRealtimeFileStatusUnchecked(HoodieLogFile latestLogFile,
+
Stream<HoodieLogFile> logFiles,
+
Option<HoodieInstant> latestCompletedInstantOpt,
+
HoodieTableMetaClient tableMetaClient) {
+ List<HoodieLogFile> sortedLogFiles =
logFiles.sorted(HoodieLogFile.getLogFileComparator()).collect(Collectors.toList());
+ try {
+ RealtimeFileStatus rtFileStatus = new
RealtimeFileStatus(latestLogFile.getFileStatus());
+ rtFileStatus.setDeltaLogFiles(sortedLogFiles);
+ rtFileStatus.setBasePath(tableMetaClient.getBasePath());
+
+ if (latestCompletedInstantOpt.isPresent()) {
+ HoodieInstant latestCompletedInstant = latestCompletedInstantOpt.get();
+ checkState(latestCompletedInstant.isCompleted());
Review comment:
I think we should control all assertion centrally (ie being to toggle
on/off all of the assertions holistically) and i would prefer to have this
assertion (since it's not bringing any performance penalty along)
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,127 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
return returns.toArray(new FileStatus[0]);
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
+ List<FileStatus> diff = CollectionUtils.diff(targetFiles,
legacyFileStatuses);
+ checkState(diff.isEmpty(), "Should be empty");
+ }
+
+ @Nonnull
+ private static FileStatus getFileStatusUnchecked(HoodieBaseFile baseFile) {
+ try {
+ return HoodieInputFormatUtils.getFileStatus(baseFile);
+ } catch (IOException ioe) {
+ throw new HoodieIOException("Failed to get file-status", ioe);
+ }
+ }
+
+ /**
+ * Abstracts and exposes {@link FileInputFormat#listStatus(JobConf)}
operation to subclasses that
+ * lists files (returning an array of {@link FileStatus}) corresponding to
the input paths specified
+ * as part of provided {@link JobConf}
+ */
+ protected final FileStatus[] doListStatus(JobConf job) throws IOException {
+ return super.listStatus(job);
+ }
+
+ /**
+ * Achieves listStatus functionality for an incrementally queried table.
Instead of listing all
+ * partitions and then filtering based on the commits of interest, this
logic first extracts the
+ * partitions touched by the desired commits and then lists only those
partitions.
+ */
+ protected List<FileStatus> listStatusForIncrementalMode(JobConf job,
+
HoodieTableMetaClient tableMetaClient,
+ List<Path>
inputPaths,
+ String
incrementalTable) throws IOException {
+ String tableName = tableMetaClient.getTableConfig().getTableName();
+ Job jobContext = Job.getInstance(job);
+ Option<HoodieTimeline> timeline =
HoodieInputFormatUtils.getFilteredCommitsTimeline(jobContext, tableMetaClient);
+ if (!timeline.isPresent()) {
+ return null;
+ }
+ Option<List<HoodieInstant>> commitsToCheck =
HoodieInputFormatUtils.getCommitsForIncrementalQuery(jobContext, tableName,
timeline.get());
+ if (!commitsToCheck.isPresent()) {
+ return null;
+ }
+ Option<String> incrementalInputPaths =
HoodieInputFormatUtils.getAffectedPartitions(commitsToCheck.get(),
tableMetaClient, timeline.get(), inputPaths);
+ // Mutate the JobConf to set the input paths to only partitions touched by
incremental pull.
+ if (!incrementalInputPaths.isPresent()) {
+ return null;
+ }
+ setInputPaths(job, incrementalInputPaths.get());
+ FileStatus[] fileStatuses = doListStatus(job);
+ return HoodieInputFormatUtils.filterIncrementalFileStatus(jobContext,
tableMetaClient, timeline.get(), fileStatuses, commitsToCheck.get());
+ }
+
+ protected abstract boolean includeLogFilesForSnapshotView();
+
+ @Nonnull
+ private static RealtimeFileStatus
createRealtimeFileStatusUnchecked(HoodieBaseFile baseFile,
Review comment:
These are simple ctor-like methods which are private in scope. What do
you like to see as a description for it?
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/PathWithLogFilePath.java
##########
@@ -31,7 +31,7 @@
*/
public class PathWithLogFilePath extends Path {
// a flag to mark this split is produced by incremental query or not.
- private boolean belongToIncrementalPath = false;
+ private boolean belongsToIncrementalPath = false;
Review comment:
Then unfortunately will double number of stacked PRs -- not taking up
any major renamings, other than touch a few files at most (this in particular
to bring consistency across classes in naming correspondent fields)
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -143,6 +121,124 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
return returns.toArray(new FileStatus[0]);
}
+ private void validate(List<FileStatus> targetFiles, List<FileStatus>
legacyFileStatuses) {
Review comment:
The reason for re-ordering is to maintain the invariant that you order
the methods in the order of loose > strict access (static methods are kept in
the same order after all non-static methods). This allows for easier analysis
of the hierarchy what's present in which part of the hierarchy and make sure
they are structured correctly.
Apologies for the jitter in the review.
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
private static final Logger LOG =
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
- public static InputSplit[] getRealtimeSplits(Configuration conf,
Stream<FileSplit> fileSplits) {
+ public static InputSplit[] getRealtimeSplits(Configuration conf,
List<FileSplit> fileSplits) throws IOException {
Review comment:
Simply limiting the amount of changes to avoid changing too many things
at the same time
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeBootstrapBaseFileSplit.java
##########
@@ -43,17 +43,20 @@
private String basePath;
- public RealtimeBootstrapBaseFileSplit() {
- super();
- }
Review comment:
We should certainly clean up such unnecessary clauses
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileInputFormatBase.java
##########
@@ -187,13 +286,28 @@ private static FileStatus
getFileStatusUnchecked(Option<HoodieBaseFile> baseFile
.map(fileSlice -> {
Option<HoodieBaseFile> baseFileOpt = fileSlice.getBaseFile();
Option<HoodieLogFile> latestLogFileOpt =
fileSlice.getLatestLogFile();
- if (baseFileOpt.isPresent()) {
- return getFileStatusUnchecked(baseFileOpt);
- } else if (includeLogFilesForSnapShotView() &&
latestLogFileOpt.isPresent()) {
- return
createRealtimeFileStatusUnchecked(latestLogFileOpt.get(),
fileSlice.getLogFiles());
+ Stream<HoodieLogFile> logFiles = fileSlice.getLogFiles();
+
+ Option<HoodieInstant> latestCompletedInstantOpt =
+ fromScala(fileIndex.latestCompletedInstant());
+
+ // Check if we're reading a MOR table
+ if (includeLogFilesForSnapshotView()) {
+ if (baseFileOpt.isPresent()) {
+ return
createRealtimeFileStatusUnchecked(baseFileOpt.get(), logFiles,
latestCompletedInstantOpt, tableMetaClient);
+ } else if (latestLogFileOpt.isPresent()) {
+ return
createRealtimeFileStatusUnchecked(latestLogFileOpt.get(), logFiles,
latestCompletedInstantOpt, tableMetaClient);
+ } else {
+ throw new IllegalStateException("Invalid state: either
base-file or log-file has to be present");
+ }
} else {
- throw new IllegalStateException("Invalid state: either
base-file or log-file should be present");
+ if (baseFileOpt.isPresent()) {
+ return getFileStatusUnchecked(baseFileOpt.get());
+ } else {
+ throw new IllegalStateException("Invalid state: base-file
has to be present");
+ }
}
Review comment:
`listStatusForSnapshotMode()` is used for both COW/MOR, and whether
we're handling COW/MOR is controlled by `includeLogFilesForSnapshotView` (will
clean this up in a follow up, cleaning up some assertions after majority of the
PRs land)
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -144,28 +204,32 @@
return rtSplits.toArray(new InputSplit[0]);
}
+ /**
+ * @deprecated will be replaced w/ {@link #getRealtimeSplits(Configuration,
List)}
+ */
// get IncrementalRealtimeSplits
- public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf,
Stream<FileSplit> fileSplits) throws IOException {
+ public static InputSplit[] getIncrementalRealtimeSplits(Configuration conf,
List<FileSplit> fileSplits) throws IOException {
+
checkState(fileSplits.stream().allMatch(HoodieRealtimeInputFormatUtils::doesBelongToIncrementalQuery),
+ "All splits have to belong to incremental query");
+
List<InputSplit> rtSplits = new ArrayList<>();
- List<FileSplit> fileSplitList = fileSplits.collect(Collectors.toList());
- Set<Path> partitionSet = fileSplitList.stream().map(f ->
f.getPath().getParent()).collect(Collectors.toSet());
+ Set<Path> partitionSet = fileSplits.stream().map(f ->
f.getPath().getParent()).collect(Collectors.toSet());
Map<Path, HoodieTableMetaClient> partitionsToMetaClient =
getTableMetaClientByPartitionPath(conf, partitionSet);
// Pre process tableConfig from first partition to fetch virtual key info
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
if (partitionSet.size() > 0) {
hoodieVirtualKeyInfo =
getHoodieVirtualKeyInfo(partitionsToMetaClient.get(partitionSet.iterator().next()));
}
Option<HoodieVirtualKeyInfo> finalHoodieVirtualKeyInfo =
hoodieVirtualKeyInfo;
- fileSplitList.stream().forEach(s -> {
+ fileSplits.stream().forEach(s -> {
// deal with incremental query.
try {
if (s instanceof BaseFileWithLogsSplit) {
- BaseFileWithLogsSplit bs = (BaseFileWithLogsSplit)s;
- if (bs.getBelongToIncrementalSplit()) {
- rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(),
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
- }
+ BaseFileWithLogsSplit bs = unsafeCast(s);
+ rtSplits.add(new HoodieRealtimeFileSplit(bs, bs.getBasePath(),
bs.getDeltaLogFiles(), bs.getMaxCommitTime(), finalHoodieVirtualKeyInfo));
} else if (s instanceof RealtimeBootstrapBaseFileSplit) {
- rtSplits.add(s);
+ RealtimeBootstrapBaseFileSplit bs = unsafeCast(s);
Review comment:
This is Java, sir
:-)
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
##########
@@ -65,11 +65,71 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.TypeUtils.unsafeCast;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
private static final Logger LOG =
LogManager.getLogger(HoodieRealtimeInputFormatUtils.class);
- public static InputSplit[] getRealtimeSplits(Configuration conf,
Stream<FileSplit> fileSplits) {
+ public static InputSplit[] getRealtimeSplits(Configuration conf,
List<FileSplit> fileSplits) throws IOException {
+ if (fileSplits.isEmpty()) {
+ return new InputSplit[0];
+ }
+
+ FileSplit fileSplit = fileSplits.get(0);
+
+ // Pre-process table-config to fetch virtual key info
+ Path partitionPath = fileSplit.getPath().getParent();
+ HoodieTableMetaClient metaClient =
getTableMetaClientForBasePathUnchecked(conf, partitionPath);
+
+ Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfoOpt =
getHoodieVirtualKeyInfo(metaClient);
+
+ // NOTE: This timeline is kept in sync w/ {@code HoodieTableFileIndexBase}
+ HoodieInstant latestCommitInstant =
+
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get();
+
+ InputSplit[] finalSplits = fileSplits.stream()
+ .map(split -> {
+ // There are 4 types of splits could we have to handle here
+ // - {@code BootstrapBaseFileSplit}: in case base file does have
associated bootstrap file,
+ // but does NOT have any log files appended (convert it to {@code
RealtimeBootstrapBaseFileSplit})
+ // - {@code RealtimeBootstrapBaseFileSplit}: in case base file does
have associated bootstrap file
+ // and does have log files appended
+ // - {@code BaseFileWithLogsSplit}: in case base file does NOT have
associated bootstrap file
Review comment:
This will be handled by `else` block
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieRealtimeFileSplit.java
##########
@@ -44,9 +44,7 @@
private Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo = Option.empty();
- public HoodieRealtimeFileSplit() {
- super();
Review comment:
Superfluous. It's done by default
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -440,6 +437,9 @@ public static HoodieMetadataConfig
buildMetadataConfig(Configuration conf) {
.build();
}
+ /**
+ * @deprecated
+ */
Review comment:
Method will be cleaned up altogether by HUDI-3280
##########
File path:
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
##########
@@ -516,8 +556,6 @@ void testMORTableRestore(boolean restoreAfterCompaction)
throws Exception {
JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 1);
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords,
newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
- List<WriteStatus> statuses = writeStatusJavaRDD.collect();
- assertNoWriteErrors(statuses);
Review comment:
We can't actually keep this check, since we can't do double dereference
of the RDD
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]