This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 14d5d11 [HUDI-1406] Add date partition based source input selector
for Delta streamer (#2264)
14d5d11 is described below
commit 14d5d1100c69839d2bcfea26af0efdcd2057650d
Author: Bhavani Sudha Saktheeswaran <[email protected]>
AuthorDate: Thu Dec 17 03:59:30 2020 -0800
[HUDI-1406] Add date partition based source input selector for Delta
streamer (#2264)
- Adds ability to list only recent date based partitions from source data.
- Parallelizes listing for faster tailing of DFSSources
---
.../hudi/utilities/sources/AvroDFSSource.java | 2 +-
.../hudi/utilities/sources/CsvDFSSource.java | 2 +-
.../hudi/utilities/sources/JsonDFSSource.java | 2 +-
.../hudi/utilities/sources/ParquetDFSSource.java | 2 +-
.../utilities/sources/helpers/DFSPathSelector.java | 23 ++-
.../sources/helpers/DatePartitionPathSelector.java | 209 ++++++++++++++++++++
.../helpers/TestDatePartitionPathSelector.java | 212 +++++++++++++++++++++
7 files changed, 444 insertions(+), 8 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
index 1152cd6..aed6c6b 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
@@ -52,7 +52,7 @@ public class AvroDFSSource extends AvroSource {
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String>
lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
- pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr,
sourceLimit);
+ pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext,
lastCkptStr, sourceLimit);
return selectPathsWithMaxModificationTime.getLeft()
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)),
selectPathsWithMaxModificationTime.getRight()))
.orElseGet(() -> new InputBatch<>(Option.empty(),
selectPathsWithMaxModificationTime.getRight()));
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
index dc40b47..1ce50b2 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
@@ -92,7 +92,7 @@ public class CsvDFSSource extends RowSource {
protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr,
long sourceLimit) {
Pair<Option<String>, String> selPathsWithMaxModificationTime =
- pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr,
sourceLimit);
+ pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext,
lastCkptStr, sourceLimit);
return Pair.of(fromFiles(
selPathsWithMaxModificationTime.getLeft()),
selPathsWithMaxModificationTime.getRight());
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
index d34289d..64da4f4 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
@@ -44,7 +44,7 @@ public class JsonDFSSource extends JsonSource {
@Override
protected InputBatch<JavaRDD<String>> fetchNewData(Option<String>
lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selPathsWithMaxModificationTime =
- pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr,
sourceLimit);
+ pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext,
lastCkptStr, sourceLimit);
return selPathsWithMaxModificationTime.getLeft()
.map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)),
selPathsWithMaxModificationTime.getRight()))
.orElse(new InputBatch<>(Option.empty(),
selPathsWithMaxModificationTime.getRight()));
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
index 55d2de2..a56a878 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -45,7 +45,7 @@ public class ParquetDFSSource extends RowSource {
@Override
public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String>
lastCkptStr, long sourceLimit) {
Pair<Option<String>, String> selectPathsWithMaxModificationTime =
- pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr,
sourceLimit);
+ pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext,
lastCkptStr, sourceLimit);
return selectPathsWithMaxModificationTime.getLeft()
.map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)),
selectPathsWithMaxModificationTime.getRight()))
.orElseGet(() -> Pair.of(Option.empty(),
selectPathsWithMaxModificationTime.getRight()));
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 47419e0..d9d3444 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -34,8 +34,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -43,7 +45,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
-public class DFSPathSelector {
+public class DFSPathSelector implements Serializable {
protected static volatile Logger log =
LogManager.getLogger(DFSPathSelector.class);
@@ -90,13 +92,26 @@ public class DFSPathSelector {
/**
* Get the list of files changed since last checkpoint.
*
+ * @param sparkContext JavaSparkContext to help parallelize certain
operations
* @param lastCheckpointStr the last checkpoint time string, empty if first
run
* @param sourceLimit max bytes to read each time
* @return the list of files concatenated and their latest modified time
*/
- public Pair<Option<String>, String>
getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
- long sourceLimit) {
+ public Pair<Option<String>, String>
getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext,
Option<String> lastCheckpointStr,
+
long sourceLimit) {
+ return getNextFilePathsAndMaxModificationTime(lastCheckpointStr,
sourceLimit);
+ }
+ /**
+ * Get the list of files changed since last checkpoint.
+ *
+ * @param lastCheckpointStr the last checkpoint time string, empty if first
run
+ * @param sourceLimit max bytes to read each time
+ * @return the list of files concatenated and their latest modified time
+ */
+ @Deprecated
+ public Pair<Option<String>, String>
getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
+
long sourceLimit) {
try {
// obtain all eligible files under root folder.
log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP)
+ " source limit => " + sourceLimit);
@@ -136,7 +151,7 @@ public class DFSPathSelector {
/**
* List files recursively, filter out illegible files/directories while
doing so.
*/
- private List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long
lastCheckpointTime) throws IOException {
+ protected List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long
lastCheckpointTime) throws IOException {
// skip files/dirs whose names start with (_, ., etc)
FileStatus[] statuses = fs.listStatus(path, file ->
IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx ->
file.getName().startsWith(pfx)));
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
new file mode 100644
index 0000000..2cedb6c
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there
is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields
and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3,
listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date
string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition
path can be of the
+ * form
`<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+
+ private static volatile Logger LOG =
LogManager.getLogger(DatePartitionPathSelector.class);
+
+ private final int datePartitionDepth;
+ private final int numPrevDaysToList;
+ private final LocalDate fromDate;
+ private final LocalDate currentDate;
+ private final int partitionsListParallelism;
+
+ /** Configs supported. */
+ public static class Config {
+ public static final String DATE_PARTITION_DEPTH =
+ "hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth";
+ public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no
(date) partition
+
+ public static final String LOOKBACK_DAYS =
+
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.lookback.days";
+ public static final int DEFAULT_LOOKBACK_DAYS = 2;
+
+ public static final String CURRENT_DATE =
+ "hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate";
+
+
+ public static final String PARTITIONS_LIST_PARALLELISM =
+ "hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism";
+ public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
+ }
+
+ public DatePartitionPathSelector(TypedProperties props, Configuration
hadoopConf) {
+ super(props, hadoopConf);
+ /*
+ * datePartitionDepth = 0 is same as basepath and there is no partition.
In which case
+ * this path selector would be a no-op and lists all paths under the table
basepath.
+ */
+ datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH,
DEFAULT_DATE_PARTITION_DEPTH);
+ // If not specified the current date is assumed by default.
+ currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE,
LocalDate.now().toString()));
+ numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS);
+ fromDate = currentDate.minusDays(numPrevDaysToList);
+ partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM,
DEFAULT_PARTITIONS_LIST_PARALLELISM);
+ }
+
+ @Override
+ public Pair<Option<String>, String>
getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext,
+
Option<String> lastCheckpointStr,
+
long sourceLimit) {
+ // obtain all eligible files under root folder.
+ LOG.info(
+ "Root path => "
+ + props.getString(ROOT_INPUT_PATH_PROP)
+ + " source limit => "
+ + sourceLimit
+ + " depth of day partition => "
+ + datePartitionDepth
+ + " num prev days to list => "
+ + numPrevDaysToList
+ + " from current date => "
+ + currentDate);
+ long lastCheckpointTime =
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+ HoodieSparkEngineContext context = new
HoodieSparkEngineContext(sparkContext);
+ SerializableConfiguration serializedConf = new
SerializableConfiguration(fs.getConf());
+ List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs,
props.getString(ROOT_INPUT_PATH_PROP));
+
+ List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
+ path -> {
+ FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
+ return listEligibleFiles(fs, new Path(path),
lastCheckpointTime).stream();
+ }, partitionsListParallelism);
+ // sort them by modification time.
+
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
+
+ // Filter based on checkpoint & input size, if needed
+ long currentBytes = 0;
+ long maxModificationTime = Long.MIN_VALUE;
+ List<FileStatus> filteredFiles = new ArrayList<>();
+ for (FileStatus f : eligibleFiles) {
+ if (currentBytes + f.getLen() >= sourceLimit) {
+ // we have enough data, we are done
+ break;
+ }
+
+ maxModificationTime = f.getModificationTime();
+ currentBytes += f.getLen();
+ filteredFiles.add(f);
+ }
+
+ // no data to read
+ if (filteredFiles.isEmpty()) {
+ return new ImmutablePair<>(
+ Option.empty(), lastCheckpointStr.orElseGet(() ->
String.valueOf(Long.MIN_VALUE)));
+ }
+
+ // read the files out.
+ String pathStr = filteredFiles.stream().map(f ->
f.getPath().toString()).collect(Collectors.joining(","));
+
+ return new ImmutablePair<>(Option.ofNullable(pathStr),
String.valueOf(maxModificationTime));
+ }
+
+ /**
+ * Prunes date level partitions to last few days configured by
'NUM_PREV_DAYS_TO_LIST' from
+ * 'CURRENT_DATE'. Parallelizes listing by leveraging
HoodieSparkEngineContext's methods.
+ */
+ public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext
context, FileSystem fs, String rootPath) {
+ List<String> partitionPaths = new ArrayList<>();
+ // get all partition paths before date partition level
+ partitionPaths.add(rootPath);
+ if (datePartitionDepth <= 0) {
+ return partitionPaths;
+ }
+ SerializableConfiguration serializedConf = new
SerializableConfiguration(fs.getConf());
+ for (int i = 0; i < datePartitionDepth; i++) {
+ partitionPaths = context.flatMap(partitionPaths, path -> {
+ Path subDir = new Path(path);
+ FileSystem fileSystem = subDir.getFileSystem(serializedConf.get());
+ // skip files/dirs whose names start with (_, ., etc)
+ FileStatus[] statuses = fileSystem.listStatus(subDir,
+ file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx ->
file.getName().startsWith(pfx)));
+ List<String> res = new ArrayList<>();
+ for (FileStatus status : statuses) {
+ res.add(status.getPath().toString());
+ }
+ return res.stream();
+ }, partitionsListParallelism);
+ }
+
+ // Prune date partitions to last few days
+ return context.getJavaSparkContext().parallelize(partitionPaths,
partitionsListParallelism)
+ .filter(s -> {
+ String[] splits = s.split("/");
+ String datePartition = splits[splits.length - 1];
+ LocalDate partitionDate;
+ if (datePartition.contains("=")) {
+ String[] moreSplit = datePartition.split("=");
+ ValidationUtils.checkArgument(
+ moreSplit.length == 2,
+ "Partition Field (" + datePartition + ") not in expected
format");
+ partitionDate = LocalDate.parse(moreSplit[1]);
+ } else {
+ partitionDate = LocalDate.parse(datePartition);
+ }
+ return (partitionDate.isEqual(fromDate) ||
partitionDate.isAfter(fromDate))
+ && (partitionDate.isEqual(currentDate) ||
partitionDate.isBefore(currentDate));
+ }).collect();
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
new file mode 100644
index 0000000..b7e1279
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
+
+ private transient HoodieSparkEngineContext context = null;
+ static List<LocalDate> totalDates;
+
+ @BeforeAll
+ public static void initClass() {
+ String s = "2020-07-21";
+ String e = "2020-07-25";
+ LocalDate start = LocalDate.parse(s);
+ LocalDate end = LocalDate.parse(e);
+ totalDates = new ArrayList<>();
+ while (!start.isAfter(end)) {
+ totalDates.add(start);
+ start = start.plusDays(1);
+ }
+ }
+
+ @BeforeEach
+ public void setup() {
+ initSparkContexts();
+ initPath();
+ initFileSystem();
+ context = new HoodieSparkEngineContext(jsc);
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ cleanupResources();
+ }
+
+ /*
+ * Create Date partitions with some files under each of the leaf Dirs.
+ */
+ public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean
hiveStyle)
+ throws IOException {
+ List<Path> allFiles = new ArrayList<>();
+ for (Path path : leafDirs) {
+ List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle);
+ for (Path datePartition : datePartitions) {
+ allFiles.addAll(createRandomFilesUnder(datePartition));
+ }
+ }
+ return allFiles;
+ }
+
+ /**
+ * Create all parent level dirs before the date partitions.
+ *
+ * @param root Current parent dir. Initially this points to table basepath.
+ * @param dirs List o sub dirs to be created under root.
+ * @param depth Depth of partitions before date partitions.
+ * @param leafDirs Collect list of leaf dirs. These will be the immediate
parents of date based partitions.
+ * @throws IOException
+ */
+ public void createParentDirsBeforeDatePartitions(Path root, List<String>
dirs, int depth, List<Path> leafDirs)
+ throws IOException {
+ if (depth <= 0) {
+ leafDirs.add(root);
+ return;
+ }
+ for (String s : dirs) {
+ Path subdir = new Path(root, s);
+ fs.mkdirs(subdir);
+ createParentDirsBeforeDatePartitions(subdir, generateRandomStrings(),
depth - 1, leafDirs);
+ }
+ }
+
+ /*
+ * Random string generation util used for generating file names or file
contents.
+ */
+ private List<String> generateRandomStrings() {
+ List<String> subDirs = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ subDirs.add(UUID.randomUUID().toString());
+ }
+ return subDirs;
+ }
+
+ /*
+ * Generate date based partitions under a parent dir with or without
hivestyle formatting.
+ */
+ private List<Path> generateDatePartitionsUnder(Path parent, boolean
hiveStyle) throws IOException {
+ List<Path> datePartitions = new ArrayList<>();
+ String prefix = (hiveStyle ? "dt=" : "");
+ for (int i = 0; i < 5; i++) {
+ Path child = new Path(parent, prefix + totalDates.get(i).toString());
+ fs.mkdirs(child);
+ datePartitions.add(child);
+ }
+ return datePartitions;
+ }
+
+ /*
+ * Creates random files under the given directory.
+ */
+ private List<Path> createRandomFilesUnder(Path path) throws IOException {
+ List<Path> resultFiles = new ArrayList<>();
+ List<String> fileNames = generateRandomStrings();
+ for (String fileName : fileNames) {
+ List<String> fileContent = generateRandomStrings();
+ String[] lines = new String[fileContent.size()];
+ lines = fileContent.toArray(lines);
+ Path file = new Path(path, fileName);
+ UtilitiesTestBase.Helpers.saveStringsToDFS(lines, fs, file.toString());
+ resultFiles.add(file);
+ }
+ return resultFiles;
+ }
+
+ private static TypedProperties getProps(
+ String basePath, int datePartitionDepth, int numDaysToList, String
currentDate) {
+ TypedProperties properties = new TypedProperties();
+ properties.put(ROOT_INPUT_PATH_PROP, basePath);
+ properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth);
+ properties.put(LOOKBACK_DAYS, "" + numDaysToList);
+ properties.put(CURRENT_DATE, currentDate);
+ return properties;
+ }
+
+ /*
+ * Return test params => (table basepath, date partition's depth,
+ * num of prev days to list, current date, is date partition formatted in
hive style?,
+ * expected number of paths after pruning)
+ */
+ private static Stream<Arguments> configParams() {
+ Object[][] data =
+ new Object[][] {
+ {"table1", 0, 2, "2020-07-25", true, 1},
+ {"table2", 0, 2, "2020-07-25", false, 1},
+ {"table3", 1, 3, "2020-07-25", true, 4},
+ {"table4", 1, 3, "2020-07-25", false, 4},
+ {"table5", 2, 1, "2020-07-25", true, 10},
+ {"table6", 2, 1, "2020-07-25", false, 10},
+ {"table7", 3, 2, "2020-07-25", true, 75},
+ {"table8", 3, 2, "2020-07-25", false, 75}
+ };
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ @ParameterizedTest(name = "[{index}] {0}")
+ @MethodSource("configParams")
+ public void testPruneDatePartitionPaths(
+ String tableName,
+ int datePartitionDepth,
+ int numPrevDaysToList,
+ String currentDate,
+ boolean isHiveStylePartition,
+ int expectedNumFiles)
+ throws IOException {
+ TypedProperties props = getProps(basePath + "/" + tableName,
datePartitionDepth, numPrevDaysToList, currentDate);
+ DatePartitionPathSelector pathSelector = new
DatePartitionPathSelector(props, jsc.hadoopConfiguration());
+
+ Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+ int totalDepthBeforeDatePartitions =
props.getInteger(DATE_PARTITION_DEPTH) - 1;
+
+ // Create parent dir
+ List<Path> leafDirs = new ArrayList<>();
+ createParentDirsBeforeDatePartitions(root, generateRandomStrings(),
totalDepthBeforeDatePartitions, leafDirs);
+ createDatePartitionsWithFiles(leafDirs, isHiveStylePartition);
+
+ List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs,
root.toString());
+
+ assertEquals(expectedNumFiles,
pathSelector.pruneDatePartitionPaths(context, fs, root.toString()).size());
+ }
+}