This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 14d5d11  [HUDI-1406] Add date partition based source input selector 
for Delta streamer (#2264)
14d5d11 is described below

commit 14d5d1100c69839d2bcfea26af0efdcd2057650d
Author: Bhavani Sudha Saktheeswaran <[email protected]>
AuthorDate: Thu Dec 17 03:59:30 2020 -0800

    [HUDI-1406] Add date partition based source input selector for Delta 
streamer (#2264)
    
    - Adds ability to list only recent date based partitions from source data.
    - Parallelizes listing for faster tailing of DFSSources
---
 .../hudi/utilities/sources/AvroDFSSource.java      |   2 +-
 .../hudi/utilities/sources/CsvDFSSource.java       |   2 +-
 .../hudi/utilities/sources/JsonDFSSource.java      |   2 +-
 .../hudi/utilities/sources/ParquetDFSSource.java   |   2 +-
 .../utilities/sources/helpers/DFSPathSelector.java |  23 ++-
 .../sources/helpers/DatePartitionPathSelector.java | 209 ++++++++++++++++++++
 .../helpers/TestDatePartitionPathSelector.java     | 212 +++++++++++++++++++++
 7 files changed, 444 insertions(+), 8 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
index 1152cd6..aed6c6b 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroDFSSource.java
@@ -52,7 +52,7 @@ public class AvroDFSSource extends AvroSource {
   @Override
   protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
     Pair<Option<String>, String> selectPathsWithMaxModificationTime =
-        pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, 
sourceLimit);
+        pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, 
lastCkptStr, sourceLimit);
     return selectPathsWithMaxModificationTime.getLeft()
         .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), 
selectPathsWithMaxModificationTime.getRight()))
         .orElseGet(() -> new InputBatch<>(Option.empty(), 
selectPathsWithMaxModificationTime.getRight()));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
index dc40b47..1ce50b2 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/CsvDFSSource.java
@@ -92,7 +92,7 @@ public class CsvDFSSource extends RowSource {
   protected Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr,
       long sourceLimit) {
     Pair<Option<String>, String> selPathsWithMaxModificationTime =
-        pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, 
sourceLimit);
+        pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, 
lastCkptStr, sourceLimit);
     return Pair.of(fromFiles(
         selPathsWithMaxModificationTime.getLeft()), 
selPathsWithMaxModificationTime.getRight());
   }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
index d34289d..64da4f4 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonDFSSource.java
@@ -44,7 +44,7 @@ public class JsonDFSSource extends JsonSource {
   @Override
   protected InputBatch<JavaRDD<String>> fetchNewData(Option<String> 
lastCkptStr, long sourceLimit) {
     Pair<Option<String>, String> selPathsWithMaxModificationTime =
-        pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, 
sourceLimit);
+        pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, 
lastCkptStr, sourceLimit);
     return selPathsWithMaxModificationTime.getLeft()
         .map(pathStr -> new InputBatch<>(Option.of(fromFiles(pathStr)), 
selPathsWithMaxModificationTime.getRight()))
         .orElse(new InputBatch<>(Option.empty(), 
selPathsWithMaxModificationTime.getRight()));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
index 55d2de2..a56a878 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/ParquetDFSSource.java
@@ -45,7 +45,7 @@ public class ParquetDFSSource extends RowSource {
   @Override
   public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> 
lastCkptStr, long sourceLimit) {
     Pair<Option<String>, String> selectPathsWithMaxModificationTime =
-        pathSelector.getNextFilePathsAndMaxModificationTime(lastCkptStr, 
sourceLimit);
+        pathSelector.getNextFilePathsAndMaxModificationTime(sparkContext, 
lastCkptStr, sourceLimit);
     return selectPathsWithMaxModificationTime.getLeft()
         .map(pathStr -> Pair.of(Option.of(fromFiles(pathStr)), 
selectPathsWithMaxModificationTime.getRight()))
         .orElseGet(() -> Pair.of(Option.empty(), 
selectPathsWithMaxModificationTime.getRight()));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
index 47419e0..d9d3444 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DFSPathSelector.java
@@ -34,8 +34,10 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -43,7 +45,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-public class DFSPathSelector {
+public class DFSPathSelector implements Serializable {
 
   protected static volatile Logger log = 
LogManager.getLogger(DFSPathSelector.class);
 
@@ -90,13 +92,26 @@ public class DFSPathSelector {
   /**
    * Get the list of files changed since last checkpoint.
    *
+   * @param sparkContext JavaSparkContext to help parallelize certain 
operations
    * @param lastCheckpointStr the last checkpoint time string, empty if first 
run
    * @param sourceLimit       max bytes to read each time
    * @return the list of files concatenated and their latest modified time
    */
-  public Pair<Option<String>, String> 
getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
-      long sourceLimit) {
+  public Pair<Option<String>, String> 
getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext, 
Option<String> lastCheckpointStr,
+                                                                             
long sourceLimit) {
+    return getNextFilePathsAndMaxModificationTime(lastCheckpointStr, 
sourceLimit);
+  }
 
+  /**
+   * Get the list of files changed since last checkpoint.
+   *
+   * @param lastCheckpointStr the last checkpoint time string, empty if first 
run
+   * @param sourceLimit       max bytes to read each time
+   * @return the list of files concatenated and their latest modified time
+   */
+  @Deprecated
+  public Pair<Option<String>, String> 
getNextFilePathsAndMaxModificationTime(Option<String> lastCheckpointStr,
+                                                                             
long sourceLimit) {
     try {
       // obtain all eligible files under root folder.
       log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) 
+ " source limit => " + sourceLimit);
@@ -136,7 +151,7 @@ public class DFSPathSelector {
   /**
    * List files recursively, filter out illegible files/directories while 
doing so.
    */
-  private List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long 
lastCheckpointTime) throws IOException {
+  protected List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long 
lastCheckpointTime) throws IOException {
     // skip files/dirs whose names start with (_, ., etc)
     FileStatus[] statuses = fs.listStatus(path, file ->
       IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> 
file.getName().startsWith(pfx)));
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
new file mode 100644
index 0000000..2cedb6c
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/DatePartitionPathSelector.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_DATE_PARTITION_DEPTH;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_LOOKBACK_DAYS;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DEFAULT_PARTITIONS_LIST_PARALLELISM;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.PARTITIONS_LIST_PARALLELISM;
+
+/**
+ * Custom dfs path selector used to list just the last few days provided there 
is a date based
+ * partition.
+ *
+ * <p>This is useful for workloads where there are multiple partition fields 
and only recent
+ * partitions are affected by new writes. Especially if the data sits in S3, 
listing all historical
+ * data can be time expensive and unnecessary for the above type of workload.
+ *
+ * <p>The date based partition is expected to be of the format '<date 
string>=yyyy-mm-dd' or
+ * 'yyyy-mm-dd'. The date partition can be at any level. For ex. the partition 
path can be of the
+ * form 
`<basepath>/<partition-field1>/<date-based-partition>/<partition-field3>/` or
+ * `<basepath>/<<date-based-partition>/`
+ */
+public class DatePartitionPathSelector extends DFSPathSelector {
+
+  private static volatile Logger LOG = 
LogManager.getLogger(DatePartitionPathSelector.class);
+
+  private final int datePartitionDepth;
+  private final int numPrevDaysToList;
+  private final LocalDate fromDate;
+  private final LocalDate currentDate;
+  private final int partitionsListParallelism;
+
+  /** Configs supported. */
+  public static class Config {
+    public static final String DATE_PARTITION_DEPTH =
+        "hoodie.deltastreamer.source.dfs.datepartitioned.selector.depth";
+    public static final int DEFAULT_DATE_PARTITION_DEPTH = 0; // Implies no 
(date) partition
+
+    public static final String LOOKBACK_DAYS =
+        
"hoodie.deltastreamer.source.dfs.datepartitioned.selector.lookback.days";
+    public static final int DEFAULT_LOOKBACK_DAYS = 2;
+
+    public static final String CURRENT_DATE =
+        "hoodie.deltastreamer.source.dfs.datepartitioned.selector.currentdate";
+
+
+    public static final String PARTITIONS_LIST_PARALLELISM =
+        "hoodie.deltastreamer.source.dfs.datepartitioned.selector.parallelism";
+    public static final int DEFAULT_PARTITIONS_LIST_PARALLELISM = 20;
+  }
+
+  public DatePartitionPathSelector(TypedProperties props, Configuration 
hadoopConf) {
+    super(props, hadoopConf);
+    /*
+     * datePartitionDepth = 0 is same as basepath and there is no partition. 
In which case
+     * this path selector would be a no-op and lists all paths under the table 
basepath.
+     */
+    datePartitionDepth = props.getInteger(DATE_PARTITION_DEPTH, 
DEFAULT_DATE_PARTITION_DEPTH);
+    // If not specified the current date is assumed by default.
+    currentDate = LocalDate.parse(props.getString(Config.CURRENT_DATE, 
LocalDate.now().toString()));
+    numPrevDaysToList = props.getInteger(LOOKBACK_DAYS, DEFAULT_LOOKBACK_DAYS);
+    fromDate = currentDate.minusDays(numPrevDaysToList);
+    partitionsListParallelism = props.getInteger(PARTITIONS_LIST_PARALLELISM, 
DEFAULT_PARTITIONS_LIST_PARALLELISM);
+  }
+
+  @Override
+  public Pair<Option<String>, String> 
getNextFilePathsAndMaxModificationTime(JavaSparkContext sparkContext,
+                                                                             
Option<String> lastCheckpointStr,
+                                                                             
long sourceLimit) {
+    // obtain all eligible files under root folder.
+    LOG.info(
+        "Root path => "
+            + props.getString(ROOT_INPUT_PATH_PROP)
+            + " source limit => "
+            + sourceLimit
+            + " depth of day partition => "
+            + datePartitionDepth
+            + " num prev days to list => "
+            + numPrevDaysToList
+            + " from current date => "
+            + currentDate);
+    long lastCheckpointTime = 
lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
+    HoodieSparkEngineContext context = new 
HoodieSparkEngineContext(sparkContext);
+    SerializableConfiguration serializedConf = new 
SerializableConfiguration(fs.getConf());
+    List<String> prunedParitionPaths = pruneDatePartitionPaths(context, fs, 
props.getString(ROOT_INPUT_PATH_PROP));
+
+    List<FileStatus> eligibleFiles = context.flatMap(prunedParitionPaths,
+        path -> {
+          FileSystem fs = new Path(path).getFileSystem(serializedConf.get());
+          return listEligibleFiles(fs, new Path(path), 
lastCheckpointTime).stream();
+        }, partitionsListParallelism);
+    // sort them by modification time.
+    
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
+
+    // Filter based on checkpoint & input size, if needed
+    long currentBytes = 0;
+    long maxModificationTime = Long.MIN_VALUE;
+    List<FileStatus> filteredFiles = new ArrayList<>();
+    for (FileStatus f : eligibleFiles) {
+      if (currentBytes + f.getLen() >= sourceLimit) {
+        // we have enough data, we are done
+        break;
+      }
+
+      maxModificationTime = f.getModificationTime();
+      currentBytes += f.getLen();
+      filteredFiles.add(f);
+    }
+
+    // no data to read
+    if (filteredFiles.isEmpty()) {
+      return new ImmutablePair<>(
+          Option.empty(), lastCheckpointStr.orElseGet(() -> 
String.valueOf(Long.MIN_VALUE)));
+    }
+
+    // read the files out.
+    String pathStr = filteredFiles.stream().map(f -> 
f.getPath().toString()).collect(Collectors.joining(","));
+
+    return new ImmutablePair<>(Option.ofNullable(pathStr), 
String.valueOf(maxModificationTime));
+  }
+
+  /**
+   * Prunes date level partitions to last few days configured by 
'NUM_PREV_DAYS_TO_LIST' from
+   * 'CURRENT_DATE'. Parallelizes listing by leveraging 
HoodieSparkEngineContext's methods.
+   */
+  public List<String> pruneDatePartitionPaths(HoodieSparkEngineContext 
context, FileSystem fs, String rootPath) {
+    List<String> partitionPaths = new ArrayList<>();
+    // get all partition paths before date partition level
+    partitionPaths.add(rootPath);
+    if (datePartitionDepth <= 0) {
+      return partitionPaths;
+    }
+    SerializableConfiguration serializedConf = new 
SerializableConfiguration(fs.getConf());
+    for (int i = 0; i < datePartitionDepth; i++) {
+      partitionPaths = context.flatMap(partitionPaths, path -> {
+        Path subDir = new Path(path);
+        FileSystem fileSystem = subDir.getFileSystem(serializedConf.get());
+        // skip files/dirs whose names start with (_, ., etc)
+        FileStatus[] statuses = fileSystem.listStatus(subDir,
+            file -> IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> 
file.getName().startsWith(pfx)));
+        List<String> res = new ArrayList<>();
+        for (FileStatus status : statuses) {
+          res.add(status.getPath().toString());
+        }
+        return res.stream();
+      }, partitionsListParallelism);
+    }
+
+    // Prune date partitions to last few days
+    return context.getJavaSparkContext().parallelize(partitionPaths, 
partitionsListParallelism)
+        .filter(s -> {
+          String[] splits = s.split("/");
+          String datePartition = splits[splits.length - 1];
+          LocalDate partitionDate;
+          if (datePartition.contains("=")) {
+            String[] moreSplit = datePartition.split("=");
+            ValidationUtils.checkArgument(
+                moreSplit.length == 2,
+                "Partition Field (" + datePartition + ") not in expected 
format");
+            partitionDate = LocalDate.parse(moreSplit[1]);
+          } else {
+            partitionDate = LocalDate.parse(datePartition);
+          }
+          return (partitionDate.isEqual(fromDate) || 
partitionDate.isAfter(fromDate))
+              && (partitionDate.isEqual(currentDate) || 
partitionDate.isBefore(currentDate));
+        }).collect();
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
new file mode 100644
index 0000000..b7e1279
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestDatePartitionPathSelector.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.sources.helpers;
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.testutils.HoodieClientTestHarness;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.hadoop.fs.Path;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.time.LocalDate;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.utilities.sources.helpers.DFSPathSelector.Config.ROOT_INPUT_PATH_PROP;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.CURRENT_DATE;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.DATE_PARTITION_DEPTH;
+import static 
org.apache.hudi.utilities.sources.helpers.DatePartitionPathSelector.Config.LOOKBACK_DAYS;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestDatePartitionPathSelector extends HoodieClientTestHarness {
+
+  private transient HoodieSparkEngineContext context = null;
+  static List<LocalDate> totalDates;
+
+  @BeforeAll
+  public static void initClass() {
+    String s = "2020-07-21";
+    String e = "2020-07-25";
+    LocalDate start = LocalDate.parse(s);
+    LocalDate end = LocalDate.parse(e);
+    totalDates = new ArrayList<>();
+    while (!start.isAfter(end)) {
+      totalDates.add(start);
+      start = start.plusDays(1);
+    }
+  }
+
+  @BeforeEach
+  public void setup() {
+    initSparkContexts();
+    initPath();
+    initFileSystem();
+    context = new HoodieSparkEngineContext(jsc);
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    cleanupResources();
+  }
+
+  /*
+   * Create Date partitions with some files under each of the leaf Dirs.
+   */
+  public List<Path> createDatePartitionsWithFiles(List<Path> leafDirs, boolean 
hiveStyle)
+      throws IOException {
+    List<Path> allFiles = new ArrayList<>();
+    for (Path path : leafDirs) {
+      List<Path> datePartitions = generateDatePartitionsUnder(path, hiveStyle);
+      for (Path datePartition : datePartitions) {
+        allFiles.addAll(createRandomFilesUnder(datePartition));
+      }
+    }
+    return allFiles;
+  }
+
+  /**
+   * Create all parent level dirs before the date partitions.
+   *
+   * @param root Current parent dir. Initially this points to table basepath.
+   * @param dirs List o sub dirs to be created under root.
+   * @param depth Depth of partitions before date partitions.
+   * @param leafDirs Collect list of leaf dirs. These will be the immediate 
parents of date based partitions.
+   * @throws IOException
+   */
+  public void createParentDirsBeforeDatePartitions(Path root, List<String> 
dirs, int depth, List<Path> leafDirs)
+      throws IOException {
+    if (depth <= 0) {
+      leafDirs.add(root);
+      return;
+    }
+    for (String s : dirs) {
+      Path subdir = new Path(root, s);
+      fs.mkdirs(subdir);
+      createParentDirsBeforeDatePartitions(subdir, generateRandomStrings(), 
depth - 1, leafDirs);
+    }
+  }
+
+  /*
+   * Random string generation util used for generating file names or file 
contents.
+   */
+  private List<String> generateRandomStrings() {
+    List<String> subDirs = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      subDirs.add(UUID.randomUUID().toString());
+    }
+    return subDirs;
+  }
+
+  /*
+   * Generate date based partitions under a parent dir with or without 
hivestyle formatting.
+   */
+  private List<Path> generateDatePartitionsUnder(Path parent, boolean 
hiveStyle) throws IOException {
+    List<Path> datePartitions = new ArrayList<>();
+    String prefix = (hiveStyle ? "dt=" : "");
+    for (int i = 0; i < 5; i++) {
+      Path child = new Path(parent, prefix + totalDates.get(i).toString());
+      fs.mkdirs(child);
+      datePartitions.add(child);
+    }
+    return datePartitions;
+  }
+
+  /*
+   * Creates random files under the given directory.
+   */
+  private List<Path> createRandomFilesUnder(Path path) throws IOException {
+    List<Path> resultFiles = new ArrayList<>();
+    List<String> fileNames = generateRandomStrings();
+    for (String fileName : fileNames) {
+      List<String> fileContent = generateRandomStrings();
+      String[] lines = new String[fileContent.size()];
+      lines = fileContent.toArray(lines);
+      Path file = new Path(path, fileName);
+      UtilitiesTestBase.Helpers.saveStringsToDFS(lines, fs, file.toString());
+      resultFiles.add(file);
+    }
+    return resultFiles;
+  }
+
+  private static TypedProperties getProps(
+      String basePath, int datePartitionDepth, int numDaysToList, String 
currentDate) {
+    TypedProperties properties = new TypedProperties();
+    properties.put(ROOT_INPUT_PATH_PROP, basePath);
+    properties.put(DATE_PARTITION_DEPTH, "" + datePartitionDepth);
+    properties.put(LOOKBACK_DAYS, "" + numDaysToList);
+    properties.put(CURRENT_DATE, currentDate);
+    return properties;
+  }
+
+  /*
+   * Return test params => (table basepath, date partition's depth,
+   * num of prev days to list, current date, is date partition formatted in 
hive style?,
+   * expected number of paths after pruning)
+   */
+  private static Stream<Arguments> configParams() {
+    Object[][] data =
+        new Object[][] {
+          {"table1", 0, 2, "2020-07-25", true, 1},
+          {"table2", 0, 2, "2020-07-25", false, 1},
+          {"table3", 1, 3, "2020-07-25", true, 4},
+          {"table4", 1, 3, "2020-07-25", false, 4},
+          {"table5", 2, 1, "2020-07-25", true, 10},
+          {"table6", 2, 1, "2020-07-25", false, 10},
+          {"table7", 3, 2, "2020-07-25", true, 75},
+          {"table8", 3, 2, "2020-07-25", false, 75}
+        };
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  @ParameterizedTest(name = "[{index}] {0}")
+  @MethodSource("configParams")
+  public void testPruneDatePartitionPaths(
+      String tableName,
+      int datePartitionDepth,
+      int numPrevDaysToList,
+      String currentDate,
+      boolean isHiveStylePartition,
+      int expectedNumFiles)
+      throws IOException {
+    TypedProperties props = getProps(basePath + "/" + tableName, 
datePartitionDepth, numPrevDaysToList, currentDate);
+    DatePartitionPathSelector pathSelector = new 
DatePartitionPathSelector(props, jsc.hadoopConfiguration());
+
+    Path root = new Path(props.getString(ROOT_INPUT_PATH_PROP));
+    int totalDepthBeforeDatePartitions = 
props.getInteger(DATE_PARTITION_DEPTH) - 1;
+
+    // Create parent dir
+    List<Path> leafDirs = new ArrayList<>();
+    createParentDirsBeforeDatePartitions(root, generateRandomStrings(), 
totalDepthBeforeDatePartitions, leafDirs);
+    createDatePartitionsWithFiles(leafDirs, isHiveStylePartition);
+
+    List<String> paths = pathSelector.pruneDatePartitionPaths(context, fs, 
root.toString());
+
+    assertEquals(expectedNumFiles, 
pathSelector.pruneDatePartitionPaths(context, fs, root.toString()).size());
+  }
+}

Reply via email to