satishkotha commented on a change in pull request #2809:
URL: https://github.com/apache/hudi/pull/2809#discussion_r620508454



##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
##########
@@ -122,19 +135,48 @@ public static Path getNthParent(Path path, int n) {
     return result;
   }
 
+  /**
+   * Depending on the configs hoodie.%s.consume.pending.commits and 
hoodie.%s.consume.commit of job
+   *
+   * (hoodie.<tableName>.consume.pending.commits, 
hoodie.<tableName>.consume.commit) ->
+   *      (true, validCommit)       -> returns activeTimeline filtered until 
validCommit
+   *      (true, InValidCommit)     -> Raises HoodieIOException
+   *      (true, notSet)            -> Raises HoodieIOException
+   *      (false, validCommit)      -> returns compeltedTimeline filtered 
until validCommit
+   *      (false, InValidCommit)    -> Raises HoodieIOException
+   *      (false or notSet, notSet) -> returns completedTimeline unfiltered
+   *
+   *      validCommit is one which exists in the timeline being checked and 
vice versa
+   *
+   * @param tableName
+   * @param job
+   * @param metaClient
+   * @return
+   */
   public static HoodieTimeline getTableTimeline(final String tableName, final 
JobConf job, final HoodieTableMetaClient metaClient) {
+    HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline();
+
     boolean includePendingCommits = 
job.getBoolean(String.format(HOODIE_CONSUME_PENDING_COMMITS, tableName), false);
+    String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, 
tableName));
+
     if (includePendingCommits) {
-      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline();
-      String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, 
tableName));
-      if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
-        LOG.info("Timestamp configured for validation: " + maxCommit + " 
commits timeline:" + timeline + " table: " + tableName);
-        throw new HoodieIOException("Valid timestamp is required for " + 
HOODIE_CONSUME_COMMIT + " in validate mode");
-      }
-      return timeline.findInstantsBeforeOrEquals(maxCommit);
+      return filterIfInstantExists(tableName, timeline, maxCommit);
+    }
+
+    timeline = timeline.filterCompletedInstants();
+    if (maxCommit != null) {
+      return filterIfInstantExists(tableName, timeline, maxCommit);
     }
 
     // by default return all completed commits.
-    return 
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();
+    return timeline;
+  }
+
+  private static HoodieTimeline filterIfInstantExists(String tableName, 
HoodieTimeline timeline, String maxCommit) {
+    if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
+      LOG.info("Timestamp configured for validation: " + maxCommit + " commits 
timeline:" + timeline + " table: " + tableName);

Review comment:
       minor: could you update log message and below error message (This is no 
longer related to validate?)

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
##########
@@ -438,11 +437,20 @@ public static HoodieMetadataConfig 
buildMetadataConfig(Configuration conf) {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Hoodie Metadata initialized with completed commit instant 
as :" + metaClient);
         }
-
         HoodieTimeline timeline = 
HoodieHiveUtils.getTableTimeline(metaClient.getTableConfig().getTableName(), 
job, metaClient);
+

Review comment:
       minor: this file seems to only have new line changes, can we reset this 
file to look like master (unless you have strong opinion on new lines here)?

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieHiveUtils.java
##########
@@ -122,19 +135,48 @@ public static Path getNthParent(Path path, int n) {
     return result;
   }
 
+  /**
+   * Depending on the configs hoodie.%s.consume.pending.commits and 
hoodie.%s.consume.commit of job
+   *
+   * (hoodie.<tableName>.consume.pending.commits, 
hoodie.<tableName>.consume.commit) ->
+   *      (true, validCommit)       -> returns activeTimeline filtered until 
validCommit
+   *      (true, InValidCommit)     -> Raises HoodieIOException
+   *      (true, notSet)            -> Raises HoodieIOException
+   *      (false, validCommit)      -> returns compeltedTimeline filtered 
until validCommit
+   *      (false, InValidCommit)    -> Raises HoodieIOException
+   *      (false or notSet, notSet) -> returns completedTimeline unfiltered
+   *
+   *      validCommit is one which exists in the timeline being checked and 
vice versa
+   *
+   * @param tableName
+   * @param job
+   * @param metaClient
+   * @return
+   */
   public static HoodieTimeline getTableTimeline(final String tableName, final 
JobConf job, final HoodieTableMetaClient metaClient) {
+    HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline();
+
     boolean includePendingCommits = 
job.getBoolean(String.format(HOODIE_CONSUME_PENDING_COMMITS, tableName), false);
+    String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, 
tableName));
+
     if (includePendingCommits) {
-      HoodieTimeline timeline = 
metaClient.getActiveTimeline().getCommitsTimeline();
-      String maxCommit = job.get(String.format(HOODIE_CONSUME_COMMIT, 
tableName));
-      if (maxCommit == null || !timeline.containsInstant(maxCommit)) {
-        LOG.info("Timestamp configured for validation: " + maxCommit + " 
commits timeline:" + timeline + " table: " + tableName);
-        throw new HoodieIOException("Valid timestamp is required for " + 
HOODIE_CONSUME_COMMIT + " in validate mode");
-      }
-      return timeline.findInstantsBeforeOrEquals(maxCommit);
+      return filterIfInstantExists(tableName, timeline, maxCommit);
+    }
+
+    timeline = timeline.filterCompletedInstants();

Review comment:
       minor: I'd suggest reorganizing 162-168 to make it more readable
   
   
   `if (!includePendingCommits) {
   timeline = timeline.filterCompletedInstants();
   }`
   `return filterIfInstantExists(tableName, timeline, maxCommit)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to