This is an automated email from the ASF dual-hosted git repository.
xushiyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f53bca4 [HUDI-1655] Support custom date format and fix unsupported
exception in DatePartitionPathSelector (#2621)
f53bca4 is described below
commit f53bca404f1482e0e99ad683dd29bfaff8bfb8ab
Author: Raymond Xu <[email protected]>
AuthorDate: Thu Mar 4 21:01:51 2021 -0800
[HUDI-1655] Support custom date format and fix unsupported exception in
DatePartitionPathSelector (#2621)
- Add a config to allow parsing custom date format in
`DatePartitionPathSelector`. Currently it assumes date partition string in the
format of `yyyy-MM-dd`.
- Fix a bug where `UnsupportedOperationException` was thrown when sort
`eligibleFiles` in-place. Changed to sort it and store in a new list.
---
.../sources/helpers/DatePartitionPathSelector.java | 30 +++++++++++------
.../helpers/TestDatePartitionPathSelector.java | 38 ++++++++++++----------
2 files changed, 41 insertions(+), 27 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
index 2cedb6c..c22657f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -35,13 +35,16 @@ import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaSparkContext;
import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import static
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT;
import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_FORMAT;
import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS;
import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
@@ -59,12 +62,16 @@ import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelecto
* <p>The date based partition is expected to be of the format '<date
string>=yyyy-mm-dd' or
* 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition
path can be of the
* form
`<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
- * `<basepath>/<<date-based-partition>/`
+ * `<basepath>/<<date-based-partition>/`.
+ *
+ * <p>The date based partition format can be configured via this property
+ * hoodie.deltastreamer.source.dfs.datepartitioned.date.format
*/
public class DatePartitionPathSelector extends DFSPathSelector {
private static volatile Logger LOG =
LogManager.getLogger(DatePartitionPathSelector.class);
+ private final String dateFormat;
private final int datePartitionDepth;
private final int numPrevDaysToList;
private final LocalDate fromDate;
@@ -73,6 +80,9 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
/** Configs supported. */
public static class Config {
+ public static final String DATE_FORMAT =
"hoodie.deltastreamer.source.dfs.datepartitioned.date.format";
+ public static final String DEFAULT_DATE_FORMAT = "yyyy-MM-dd";
+
public static final String DATE_PARTITION_DEPTH =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth";
public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no
(date) partition
@@ -84,7 +94,6 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
public static final String CURRENT_DATE =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate";
-
public static final String PARTITIONS_LIST_PARALLELISM =
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism";
public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
@@ -96,6 +105,7 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
* datePartitionDepth = 0 is same as basepath and there is no partition.
In which case
* this path selector would be a no-op and lists all paths under the table
basepath.
*/
+ dateFormat = props.getString(DATE_FORMAT, DEFAULT_DATE_FORMAT);
datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH,
DEFAULT_DATE_PARTITION_DEPTH);
// If not specified the current date is assumed by default.
currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE,
LocalDate.now().toString()));
@@ -130,20 +140,19 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
return listEligibleFiles(fs, new Path(path),
lastCheckpointTime).stream();
}, partitionsListParallelism);
- // sort them by modification time.
-
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
+ // sort them by modification time ascending.
+ List<FileStatus> sortedEligibleFiles = eligibleFiles.stream()
+
.sorted(Comparator.comparingLong(FileStatus::getModificationTime)).collect(Collectors.toList());
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
- long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>();
- for (FileStatus f : eligibleFiles) {
+ for (FileStatus f : sortedEligibleFiles) {
if (currentBytes + f.getLen() >= sourceLimit) {
// we have enough data, we are done
break;
}
- maxModificationTime = f.getModificationTime();
currentBytes += f.getLen();
filteredFiles.add(f);
}
@@ -156,7 +165,7 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
// read the files out.
String pathStr = filteredFiles.stream().map(f ->
f.getPath().toString()).collect(Collectors.joining(","));
-
+ long maxModificationTime = filteredFiles.get(filteredFiles.size() -
1).getModificationTime();
return new ImmutablePair<>(Option.ofNullable(pathStr),
String.valueOf(maxModificationTime));
}
@@ -193,14 +202,15 @@ public class DatePartitionPathSelector extends
DFSPathSelector {
String[] splits = s.split("/");
String datePartition = splits[splits.length - 1];
LocalDate partitionDate;
+ DateTimeFormatter dateFormatter =
DateTimeFormatter.ofPattern(dateFormat);
if (datePartition.contains("=")) {
String[] moreSplit = datePartition.split("=");
ValidationUtils.checkArgument(
moreSplit.length == 2,
"Partition Field (" + datePartition + ") not in expected
format");
- partitionDate = LocalDate.parse(moreSplit[1]);
+ partitionDate = LocalDate.parse(moreSplit[1], dateFormatter);
} else {
- partitionDate = LocalDate.parse(datePartition);
+ partitionDate = LocalDate.parse(datePartition, dateFormatter);
}
return (partitionDate.isEqual(fromDate) ||
partitionDate.isAfter(fromDate))
&& (partitionDate.isEqual(currentDate) ||
partitionDate.isBefore(currentDate));
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
index b7e1279..30d0993 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
@@ -33,6 +33,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
import java.time.LocalDate;
+import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
@@ -40,6 +41,7 @@ import java.util.stream.Stream;
import static
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_FORMAT;
import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -78,11 +80,11 @@ public class TestDatePartitionPathSelector extends
HoodieClientTestHarness {
/*
* Create Date partitions with some files under each of the leaf Dirs.
*/
- public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean
hiveStyle)
+ public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean
hiveStyle, String dateFormat)
throws IOException {
List<Path> allFiles = new ArrayList<>();
for (Path path : leafDirs) {
- List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle);
+ List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle,
dateFormat);
for (Path datePartition : datePartitions) {
allFiles.addAll(createRandomFilesUnder(datePartition));
}
@@ -126,11 +128,12 @@ public class TestDatePartitionPathSelector extends
HoodieClientTestHarness {
/*
* Generate date based partitions under a parent dir with or without
hivestyle formatting.
*/
- private List<Path> generateDatePartitionsUnder(Path parent, boolean
hiveStyle) throws IOException {
+ private List<Path> generateDatePartitionsUnder(Path parent, boolean
hiveStyle, String dateFormat) throws IOException {
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern(dateFormat);
List<Path> datePartitions = new ArrayList<>();
String prefix = (hiveStyle ? "dt=" : "");
for (int i = 0; i < 5; i++) {
- Path child = new Path(parent, prefix + totalDates.get(i).toString());
+ Path child = new Path(parent, prefix +
formatter.format(totalDates.get(i)));
fs.mkdirs(child);
datePartitions.add(child);
}
@@ -155,9 +158,10 @@ public class TestDatePartitionPathSelector extends
HoodieClientTestHarness {
}
private static TypedProperties getProps(
- String basePath, int datePartitionDepth, int numDaysToList, String
currentDate) {
+ String basePath, String dateFormat, int datePartitionDepth, int
numDaysToList, String currentDate) {
TypedProperties properties = new TypedProperties();
properties.put(ROOT_INPUT_PATH_PROP, basePath);
+ properties.put(DATE_FORMAT, dateFormat);
properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth);
properties.put(LOOKBACK_DAYS, "" + numDaysToList);
properties.put(CURRENT_DATE, currentDate);
@@ -172,14 +176,14 @@ public class TestDatePartitionPathSelector extends
HoodieClientTestHarness {
private static Stream<Arguments> configParams() {
Object[][] data =
new Object[][] {
- {"table1", 0, 2, "2020-07-25", true, 1},
- {"table2", 0, 2, "2020-07-25", false, 1},
- {"table3", 1, 3, "2020-07-25", true, 4},
- {"table4", 1, 3, "2020-07-25", false, 4},
- {"table5", 2, 1, "2020-07-25", true, 10},
- {"table6", 2, 1, "2020-07-25", false, 10},
- {"table7", 3, 2, "2020-07-25", true, 75},
- {"table8", 3, 2, "2020-07-25", false, 75}
+ {"table1", "yyyyMMdd", 0, 2, "2020-07-25", true, 1},
+ {"table2", "yyyyMMdd", 0, 2, "2020-07-25", false, 1},
+ {"table3", "yyyyMMMdd", 1, 3, "2020-07-25", true, 4},
+ {"table4", "yyyyMMMdd", 1, 3, "2020-07-25", false, 4},
+ {"table5", "yyyy-MM-dd", 2, 1, "2020-07-25", true, 10},
+ {"table6", "yyyy-MM-dd", 2, 1, "2020-07-25", false, 10},
+ {"table7", "yyyy-MMM-dd", 3, 2, "2020-07-25", true, 75},
+ {"table8", "yyyy-MMM-dd", 3, 2, "2020-07-25", false, 75}
};
return Stream.of(data).map(Arguments::of);
}
@@ -188,13 +192,14 @@ public class TestDatePartitionPathSelector extends
HoodieClientTestHarness {
@MethodSource("configParams")
public void testPruneDatePartitionPaths(
String tableName,
+ String dateFormat,
int datePartitionDepth,
int numPrevDaysToList,
String currentDate,
boolean isHiveStylePartition,
int expectedNumFiles)
throws IOException {
- TypedProperties props = getProps(basePath + "/" + tableName,
datePartitionDepth, numPrevDaysToList, currentDate);
+ TypedProperties props = getProps(basePath + "/" + tableName, dateFormat,
datePartitionDepth, numPrevDaysToList, currentDate);
DatePartitionPathSelector pathSelector = new
DatePartitionPathSelector(props, jsc.hadoopConfiguration());
Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP));
@@ -203,10 +208,9 @@ public class TestDatePartitionPathSelector extends
HoodieClientTestHarness {
// Create parent dir
List<Path> leafDirs = new ArrayList<>();
createParentDirsBeforeDatePartitions(root, generateRandomStrings(),
totalDepthBeforeDatePartitions, leafDirs);
- createDatePartitionsWithFiles(leafDirs, isHiveStylePartition);
+ createDatePartitionsWithFiles(leafDirs, isHiveStylePartition, dateFormat);
List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs,
root.toString());
-
- assertEquals(expectedNumFiles,
pathSelector.pruneDatePartitionPaths(context, fs, root.toString()).size());
+ assertEquals(expectedNumFiles, paths.size());
}
}