satishkotha commented on a change in pull request #2809:
URL: https://github.com/apache/hudi/pull/2809#discussion_r620508454
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
##########
@@ -122,19 +135,48 @@ public static Path getNthParent(Path path, int n) {
return result;
}
+ /**
+ * Depending on the configs hoodie.%s.consume.pending.commits and
hoodie.%s.consume.commit of job
+ *
+ * (hoodie.<tableName>.consume.pending.commits,
hoodie.<tableName>.consume.commit) ->
+ * (true, validCommit) -> returns activeTimeline filtered until
validCommit
+ * (true, InValidCommit) -> Raises HoodieIOException
+ * (true, notSet) -> Raises HoodieIOException
+ * (false, validCommit) -> returns compeltedTimeline filtered
until validCommit
+ * (false, InValidCommit) -> Raises HoodieIOException
+ * (false or notSet, notSet) -> returns completedTimeline unfiltered
+ *
+ * validCommit is one which exists in the timeline being checked and
vice versa
+ *
+ * @param tableName
+ * @param job
+ * @param metaClient
+ * @return
+ */
public static HoodieTimeline getTableTimeline(final String tableName, final
JobConf job, final HoodieTableMetaClient metaClient) {
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline();
+
boolean includePendingCommits =
job.getBoolean(String.format(HOODIE_CONSUME_PENDING_COMMITS, tableName), false);
+ String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT,
tableName));
+
if (includePendingCommits) {
- HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline();
- String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT,
tableName));
- if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
- LOG.info("Timestamp configured for validation: " + maxCommit + "
commits timeline:" + timeline + " table: " + tableName);
- throw new HoodieIOException("Valid timestamp is required for " +
HOODIE_CONSUME_COMMIT + " in validate mode");
- }
- return timeline.findInstantsBeforeOrEquals(maxCommit);
+ return filterIfInstantExists(tableName, timeline, maxCommit);
+ }
+
+ timeline = timeline.filterCompletedInstants();
+ if (maxCommit != null) {
+ return filterIfInstantExists(tableName, timeline, maxCommit);
}
// by default return all completed commits.
- return
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+ return timeline;
+ }
+
+ private static HoodieTimeline filterIfInstantExists(String tableName,
HoodieTimeline timeline, String maxCommit) {
+ if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
+ LOG.info("Timestamp configured for validation: " + maxCommit + " commits
timeline:" + timeline + " table: " + tableName);
Review comment:
minor: could you update log message and below error message (This is no
longer related to validate?)
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -438,11 +437,20 @@ public static HoodieMetadataConfig
buildMetadataConfig(Configuration conf) {
if (LOG.isDebugEnabled()) {
LOG.debug("Hoodie Metadata initialized with completed commit instant
as :" + metaClient);
}
-
HoodieTimeline timeline =
HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(),
job, metaClient);
+
Review comment:
minor: this file seems to only have new line changes, can we reset this
file to look like master (unless you have strong opinion on new lines here)?
##########
File path:
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
##########
@@ -122,19 +135,48 @@ public static Path getNthParent(Path path, int n) {
return result;
}
+ /**
+ * Depending on the configs hoodie.%s.consume.pending.commits and
hoodie.%s.consume.commit of job
+ *
+ * (hoodie.<tableName>.consume.pending.commits,
hoodie.<tableName>.consume.commit) ->
+ * (true, validCommit) -> returns activeTimeline filtered until
validCommit
+ * (true, InValidCommit) -> Raises HoodieIOException
+ * (true, notSet) -> Raises HoodieIOException
+ * (false, validCommit) -> returns compeltedTimeline filtered
until validCommit
+ * (false, InValidCommit) -> Raises HoodieIOException
+ * (false or notSet, notSet) -> returns completedTimeline unfiltered
+ *
+ * validCommit is one which exists in the timeline being checked and
vice versa
+ *
+ * @param tableName
+ * @param job
+ * @param metaClient
+ * @return
+ */
public static HoodieTimeline getTableTimeline(final String tableName, final
JobConf job, final HoodieTableMetaClient metaClient) {
+ HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline();
+
boolean includePendingCommits =
job.getBoolean(String.format(HOODIE_CONSUME_PENDING_COMMITS, tableName), false);
+ String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT,
tableName));
+
if (includePendingCommits) {
- HoodieTimeline timeline =
metaClient.getActiveTimeline().getCommitsTimeline();
- String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT,
tableName));
- if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
- LOG.info("Timestamp configured for validation: " + maxCommit + "
commits timeline:" + timeline + " table: " + tableName);
- throw new HoodieIOException("Valid timestamp is required for " +
HOODIE_CONSUME_COMMIT + " in validate mode");
- }
- return timeline.findInstantsBeforeOrEquals(maxCommit);
+ return filterIfInstantExists(tableName, timeline, maxCommit);
+ }
+
+ timeline = timeline.filterCompletedInstants();
Review comment:
minor: I'd suggest reorganizing 162-168 to make it more readable
`if (!includePendingCommits) {
timeline = timeline.filterCompletedInstants();
}`
`return filterIfInstantExists(tableName, timeline, maxCommit)`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]