This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f5e31be [HUDI-1685] keep updating current date for every batch (#2671)
f5e31be is described below
commit f5e31be0866848940510b91dd4e9cdc7500a9086
Author: Ankush Kanungo <[email protected]>
AuthorDate: Fri Mar 12 15:53:01 2021 -0800
[HUDI-1685] keep updating current date for every batch (#2671)
---
.../sources/helpers/DatePartitionPathSelector.java | 13 ++++++-------
.../sources/helpers/TestDatePartitionPathSelector.java | 2 +-
2 files changed, 7 insertions(+), 8 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index c22657f..97106de 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -74,8 +74,6 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
private final String dateFormat;
private final int datePartitionDepth;
private final int numPrevDaysToList;
- private final LocalDate fromDate;
- private final LocalDate currentDate;
private final int partitionsListParallelism;
/** Configs supported. */
@@ -107,10 +105,7 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
*/
dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT);
datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH,
DEFAULT_DATE_PARTITION_DEPTH);
- // If not specified the current date is assumed by default.
- currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE,
LocalDate.now().toString()));
numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS);
- fromDate = currentDate.minusDays(numPrevDaysToList);
partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM,
DEFAULT_PARTITIONS_LIST_PARALLELISM);
}
@@ -118,6 +113,9 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
public Pair<Option<String>, String>
getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext,
Option<String> lastCheckpointStr,
long sourceLimit) {
+ // If not specified the current date is assumed by default.
+ LocalDate currentDate =
LocalDate.parse(props.getString(Config.CURRENT_DATE,
LocalDate.now().toString()));
+
// obtain all eligible files under root folder.
LOG.info(
"Root path => "
@@ -133,7 +131,7 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
long lastCheckpointTime =
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
HoodieSparkEngineContext context = new
HoodieSparkEngineContext(sparkContext);
SerializableConfiguration serializedConf = new
SerializableConfiguration(fs.getConf());
- List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs,
props.getString(ROOT_INPUT_PATH_PROP));
+ List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs,
props.getString(ROOT_INPUT_PATH_PROP), currentDate);
List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
path -> {
@@ -173,7 +171,7 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
* Prunes date level partitions to last few days configured by
'NUM_PREV_DAYS_TO_LIST' from
* 'CURRENT_DATE'. Parallelizes listing by leveraging
HoodieSparkEngineContext's methods.
*/
- public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext
context, FileSystem fs, String rootPath) {
+ public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext
context, FileSystem fs, String rootPath, LocalDate currentDate) {
List<String> partitionPaths = new ArrayList<>();
// get all partition paths before date partition level
partitionPaths.add(rootPath);
@@ -199,6 +197,7 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
// Prune date partitions to last few days
return context.getJavaSparkContext().parallelize(partitionPaths,
partitionsListParallelism)
.filter(s -> {
+ LocalDate fromDate = currentDate.minusDays(numPrevDaysToList);
String[] splits = s.split("/");
String datePartition = splits[splits.length - 1];
LocalDate partitionDate;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
index 30d0993..1673660 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
@@ -210,7 +210,7 @@ public class TestDatePartitionPathSelector extends
HoodieClientTestHarness {
createParentDirsBeforeDatePartitions(root, generateRandomStrings(),
totalDepthBeforeDatePartitions, leafDirs);
createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
- List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs,
root.toString());
+ List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs,
root.toString(), LocalDate.parse(currentDate));
assertEquals(expectedNumFiles, paths.size());
}
}