This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f5e31be  [HUDI-1685] keep updating current date for every batch (#2671)
f5e31be is described below

commit f5e31be0866848940510b91dd4e9cdc7500a9086
Author: Ankush Kanungo <[email protected]>
AuthorDate: Fri Mar 12 15:53:01 2021 -0800

    [HUDI-1685] keep updating current date for every batch (#2671)
---
 .../sources/helpers/DatePartitionPathSelector.java          | 13 ++++++-------
 .../sources/helpers/TestDatePartitionPathSelector.java      |  2 +-
 2 files changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index c22657f..97106de 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -74,8 +74,6 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
   private final String dateFormat;
   private final int datePartitionDepth;
   private final int numPrevDaysToList;
-  private final LocalDate fromDate;
-  private final LocalDate currentDate;
   private final int partitionsListParallelism;
 
   /** Configs supported. */
@@ -107,10 +105,7 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
      */
     dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT);
     datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, 
DEFAULT_DATE_PARTITION_DEPTH);
-    // If not specified the current date is assumed by default.
-    currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, 
LocalDate.now().toString()));
     numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS);
-    fromDate = currentDate.minusDays(numPrevDaysToList);
     partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM, 
DEFAULT_PARTITIONS_LIST_PARALLELISM);
   }
 
@@ -118,6 +113,9 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
   public Pair<Option<String>, String> 
getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext,
                                                                              
Option<String> lastCheckpointStr,
                                                                              
long sourceLimit) {
+    // If not specified the current date is assumed by default.
+    LocalDate currentDate = 
LocalDate.parse(props.getString(Config.CURRENT_DATE, 
LocalDate.now().toString()));
+
     // obtain all eligible files under root folder.
     LOG.info(
         "Root path => "
@@ -133,7 +131,7 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
     long lastCheckpointTime = 
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
     HoodieSparkEngineContext context = new 
HoodieSparkEngineContext(sparkContext);
     SerializableConfiguration serializedConf = new 
SerializableConfiguration(fs.getConf());
-    List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, 
props.getString(ROOT_INPUT_PATH_PROP));
+    List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, 
props.getString(ROOT_INPUT_PATH_PROP), currentDate);
 
     List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
         path -> {
@@ -173,7 +171,7 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
    * Prunes date level partitions to last few days configured by 
'NUM_PREV_DAYS_TO_LIST' from
    * 'CURRENT_DATE'. Parallelizes listing by leveraging 
HoodieSparkEngineContext's methods.
    */
-  public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext 
context, FileSystem fs, String rootPath) {
+  public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext 
context, FileSystem fs, String rootPath, LocalDate currentDate) {
     List<String> partitionPaths = new ArrayList<>();
     // get all partition paths before date partition level
     partitionPaths.add(rootPath);
@@ -199,6 +197,7 @@ public class DatePartitionPathSelector extends 
DFSPathSelector {
     // Prune date partitions to last few days
     return context.getJavaSparkContext().parallelize(partitionPaths, 
partitionsListParallelism)
         .filter(s -> {
+          LocalDate fromDate = currentDate.minusDays(numPrevDaysToList);
           String[] splits = s.split("/");
           String datePartition = splits[splits.length - 1];
           LocalDate partitionDate;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
index 30d0993..1673660 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
@@ -210,7 +210,7 @@ public class TestDatePartitionPathSelector extends 
HoodieClientTestHarness {
     createParentDirsBeforeDatePartitions(root, generateRandomStrings(), 
totalDepthBeforeDatePartitions, leafDirs);
     createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
 
-    List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, 
root.toString());
+    List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, 
root.toString(), LocalDate.parse(currentDate));
     assertEquals(expectedNumFiles, paths.size());
   }
 }

Reply via email to