This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new afb6a7c  [GOBBLIN-1054] Refactor HiveSource to make partition filter 
extensible[]
afb6a7c is described below

commit afb6a7c48b692ccce653637d877bf07241a59661
Author: Kumar Kandasami <[email protected]>
AuthorDate: Thu Feb 27 18:12:41 2020 -0800

    [GOBBLIN-1054] Refactor HiveSource to make partition filter extensible[]
    
    Closes #2894 from KumaravelKandasami/master
---
 .../conversion/hive/source/HiveSource.java         | 20 +++---
 .../filter/DateRangePartitionFilterGenerator.java  | 79 ++++++++++++++++++++++
 .../filter/LookbackPartitionFilterGenerator.java   | 43 +++++++-----
 ... => DateRangePartitionFilterGeneratorTest.java} | 31 ++++++---
 .../LookbackPartitionFilterGeneratorTest.java      | 13 +++-
 5 files changed, 146 insertions(+), 40 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index eb16d59..cf1477a 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -61,6 +61,7 @@ import 
org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionL
 import org.apache.gobblin.data.management.copy.hive.HiveDataset;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
 import org.apache.gobblin.data.management.copy.hive.HiveUtils;
+import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
 import 
org.apache.gobblin.data.management.copy.hive.filter.LookbackPartitionFilterGenerator;
 import org.apache.gobblin.dataset.IterableDatasetFinder;
 import org.apache.gobblin.instrumented.Instrumented;
@@ -112,6 +113,9 @@ public class HiveSource implements Source {
   public static final String HIVE_SOURCE_DATASET_FINDER_CLASS_KEY = 
"hive.dataset.finder.class";
   public static final String DEFAULT_HIVE_SOURCE_DATASET_FINDER_CLASS = 
HiveDatasetFinder.class.getName();
 
+  public static final String HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_KEY = 
"hive.dataset.finder.partitionfilter.class";
+  public static final String 
DEFAULT_HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_CLASS = 
LookbackPartitionFilterGenerator.class.getName();
+
   public static final String DISTCP_REGISTRATION_GENERATION_TIME_KEY = 
"registrationGenerationTimeMillis";
   public static final String HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY = 
"hive.source.watermarker.factoryClass";
   public static final String DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS = 
PartitionLevelWatermarker.Factory.class.getName();
@@ -147,6 +151,7 @@ public class HiveSource implements Source {
   protected HiveSourceWatermarker watermarker;
   protected IterableDatasetFinder<HiveDataset> datasetFinder;
   protected List<WorkUnit> workunits;
+  protected PartitionFilterGenerator partitionFilterGenerator;
   protected long maxLookBackTime;
   protected long beginGetWorkunitsTime;
   protected List<String> ignoreDataPathIdentifierList;
@@ -216,6 +221,9 @@ public class HiveSource implements Source {
     this.maxLookBackTime = new 
DateTime().minusDays(maxLookBackDays).getMillis();
     this.ignoreDataPathIdentifierList = 
COMMA_BASED_SPLITTER.splitToList(state.getProp(HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY,
         DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER));
+    this.partitionFilterGenerator = 
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+        state.getProp(HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_KEY,
+        DEFAULT_HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_CLASS), 
state.getProperties());
 
     silenceHiveLoggers();
   }
@@ -300,17 +308,7 @@ public class HiveSource implements Source {
     long tableProcessTime = new DateTime().getMillis();
     this.watermarker.onTableProcessBegin(hiveDataset.getTable(), 
tableProcessTime);
 
-    Optional<String> partitionFilter = Optional.absent();
-
-    // If the table is date partitioned, use the partition name to filter 
partitions older than lookback
-    if 
(hiveDataset.getProperties().containsKey(LookbackPartitionFilterGenerator.PARTITION_COLUMN)
-        && 
hiveDataset.getProperties().containsKey(LookbackPartitionFilterGenerator.DATETIME_FORMAT)
-        && 
hiveDataset.getProperties().containsKey(LookbackPartitionFilterGenerator.LOOKBACK))
 {
-      partitionFilter =
-          Optional.of(new 
LookbackPartitionFilterGenerator(hiveDataset.getProperties()).getFilter(hiveDataset));
-      log.info(String.format("Getting partitions for %s using partition filter 
%s", hiveDataset.getTable()
-          .getCompleteName(), partitionFilter.get()));
-    }
+    Optional<String> partitionFilter = 
Optional.fromNullable(this.partitionFilterGenerator.getFilter(hiveDataset));
 
     List<Partition> sourcePartitions = HiveUtils.getPartitions(client.get(), 
hiveDataset.getTable(), partitionFilter);
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGenerator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGenerator.java
new file mode 100644
index 0000000..395b40c
--- /dev/null
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGenerator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.hive.filter;
+
+import java.util.Arrays;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Filters hive partitions using BETWEEN START AND END date range.
+ * Requires PARTITION_COLUMN, START_DATE, END_DATE
+ *
+ * <p>
+ *   The generated filter is of the form "datePartition between 'start_date' 
and 'end_date' ".
+ * </p>
+ */
+@Slf4j
+public class DateRangePartitionFilterGenerator implements 
PartitionFilterGenerator {
+
+  public static final String PARTITION_COLUMN = 
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".partition.filter.datetime.column";
+  public static final String START_DATE = 
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".partition.filter.datetime.startdate";
+  public static final String END_DATE = HiveDatasetFinder.HIVE_DATASET_PREFIX 
+ ".partition.filter.datetime.enddate";
+
+  private final Properties prop;
+
+  public DateRangePartitionFilterGenerator(Properties properties) {
+    this.prop = (properties == null) ? System.getProperties(): properties;
+  }
+
+  @Override
+  public String getFilter(HiveDataset hiveDataset) {
+
+    if (isValidConfig()) {
+      String partitionColumn = this.prop.getProperty(PARTITION_COLUMN);
+      String startDate = this.prop.getProperty(START_DATE);
+      String endDate = this.prop.getProperty(END_DATE);
+
+      String partitionFilter =String.format("%s between \"%s\" and \"%s\"", 
partitionColumn, startDate, endDate);
+
+      log.info(String.format("Getting partitions for %s using partition filter 
%s", ((hiveDataset == null) ? "null" :  hiveDataset.getTable()
+          .getCompleteName()), partitionFilter));
+      return partitionFilter;
+    } else {
+      log.error(DateRangePartitionFilterGenerator.class.getName()
+          + " requires the following properties " + Arrays.toString(new 
String[]{PARTITION_COLUMN, START_DATE, END_DATE}));
+
+      return null;
+    }
+  }
+
+  private boolean isValidConfig() {
+    return 
this.prop.containsKey(DateRangePartitionFilterGenerator.PARTITION_COLUMN)
+        && this.prop.containsKey(DateRangePartitionFilterGenerator.START_DATE)
+        && this.prop.containsKey(DateRangePartitionFilterGenerator.END_DATE);
+  }
+}
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGenerator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGenerator.java
index 6c80940..d271526 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGenerator.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGenerator.java
@@ -20,13 +20,12 @@ package org.apache.gobblin.data.management.copy.hive.filter;
 import java.util.Arrays;
 import java.util.Properties;
 
+import lombok.extern.slf4j.Slf4j;
 import org.joda.time.DateTime;
 import org.joda.time.Period;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
-import com.google.common.base.Preconditions;
-
 import org.apache.gobblin.data.management.copy.hive.HiveDataset;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
 import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
@@ -41,34 +40,44 @@ import 
org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
  *   must be such that lexycographical string and date ordering are compatible.
  * </p>
  */
+@Slf4j
 public class LookbackPartitionFilterGenerator implements 
PartitionFilterGenerator {
 
   public static final String PARTITION_COLUMN = 
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".partition.filter.datetime.column";
   public static final String LOOKBACK = HiveDatasetFinder.HIVE_DATASET_PREFIX 
+ ".partition.filter.datetime.lookback";
   public static final String DATETIME_FORMAT = 
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".partition.filter.datetime.format";
-  private static final String ERROR_MESSAGE = 
LookbackPartitionFilterGenerator.class.getName()
-      + " requires the following properties " + Arrays.toString(new 
String[]{PARTITION_COLUMN, LOOKBACK, DATETIME_FORMAT});
-
-  private final String partitionColumn;
-  private final Period lookback;
-  private final DateTimeFormatter formatter;
 
+  private final Properties prop;
 
   public LookbackPartitionFilterGenerator(Properties properties) {
-    Preconditions.checkArgument(properties.containsKey(PARTITION_COLUMN), 
ERROR_MESSAGE);
-    Preconditions.checkArgument(properties.containsKey(LOOKBACK), 
ERROR_MESSAGE);
-    Preconditions.checkArgument(properties.containsKey(DATETIME_FORMAT), 
ERROR_MESSAGE);
-
-    this.partitionColumn = properties.getProperty(PARTITION_COLUMN);
-    this.lookback = Period.parse(properties.getProperty(LOOKBACK));
-    this.formatter = 
DateTimeFormat.forPattern(properties.getProperty(DATETIME_FORMAT));
+    this.prop = (properties == null) ? System.getProperties(): properties;
   }
 
   @Override
   public String getFilter(HiveDataset hiveDataset) {
 
-    DateTime limitDate = (new DateTime()).minus(this.lookback);
+    if (isValidConfig()) {
+      String partitionColumn = this.prop.getProperty(PARTITION_COLUMN);
+      Period lookback = Period.parse(this.prop.getProperty(LOOKBACK));
+      DateTimeFormatter formatter = 
DateTimeFormat.forPattern(this.prop.getProperty(DATETIME_FORMAT));
+
+      DateTime limitDate = (new DateTime()).minus(lookback);
+
+      String partitionFilter = String.format("%s >= \"%s\"", partitionColumn, 
formatter.print(limitDate));
+      log.info(String.format("Getting partitions for %s using partition filter 
%s", ((hiveDataset == null) ? "null" :  hiveDataset.getTable()
+          .getCompleteName()), partitionFilter));
+      return partitionFilter;
+    } else {
+      log.error(LookbackPartitionFilterGenerator.class.getName()
+          + " requires the following properties " + Arrays.toString(new 
String[]{PARTITION_COLUMN, LOOKBACK, DATETIME_FORMAT}));
+
+      return null;
+    }
+  }
 
-    return String.format("%s >= \"%s\"", this.partitionColumn, 
this.formatter.print(limitDate));
+  private boolean isValidConfig() {
+    return 
this.prop.containsKey(LookbackPartitionFilterGenerator.PARTITION_COLUMN)
+        && 
this.prop.containsKey(LookbackPartitionFilterGenerator.DATETIME_FORMAT)
+        && this.prop.containsKey(LookbackPartitionFilterGenerator.LOOKBACK);
   }
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGeneratorTest.java
similarity index 55%
copy from 
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
copy to 
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGeneratorTest.java
index f354cd8..cd7ec43 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGeneratorTest.java
@@ -18,7 +18,8 @@
 package org.apache.gobblin.data.management.copy.hive.filter;
 
 import java.util.Properties;
-
+import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeUtils;
 import org.testng.Assert;
@@ -26,8 +27,9 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+
 @Test(groups = { "SystemTimeTests"})
-public class LookbackPartitionFilterGeneratorTest {
+public class DateRangePartitionFilterGeneratorTest {
 
   @BeforeMethod
   public void setUp()
@@ -42,20 +44,27 @@ public class LookbackPartitionFilterGeneratorTest {
   }
 
   @Test
+  public void testInitialization() {
+    PartitionFilterGenerator filter = 
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+        DateRangePartitionFilterGenerator.class.getName(), 
System.getProperties());
+    Assert.assertTrue(filter instanceof DateRangePartitionFilterGenerator);
+  }
+  @Test
   public void test() {
-    doTest("datePartition", "P1D", "YYYY-MM-dd-HH", "datePartition >= 
\"2016-03-14-10\"");
-    doTest("datePartition", "P2D", "YYYY-MM-dd-HH", "datePartition >= 
\"2016-03-13-10\"");
-    doTest("datePartition", "PT4H", "YYYY-MM-dd-HH", "datePartition >= 
\"2016-03-15-06\"");
-    doTest("myColumn", "PT4H", "YYYY-MM-dd-HH", "myColumn >= 
\"2016-03-15-06\"");
+    doTest("datePartition", "2020-01-01", "2020-01-10", "datePartition between 
\"2020-01-01\" and \"2020-01-10\"");
   }
 
-  private void doTest(String column, String lookback, String format, String 
expected) {
+
+  private void doTest(String column, String sDate, String eDate, String 
expected) {
     Properties properties = new Properties();
-    properties.put(LookbackPartitionFilterGenerator.PARTITION_COLUMN, column);
-    properties.put(LookbackPartitionFilterGenerator.LOOKBACK, lookback);
-    properties.put(LookbackPartitionFilterGenerator.DATETIME_FORMAT, format);
+    properties.put(DateRangePartitionFilterGenerator.PARTITION_COLUMN, column);
+    properties.put(DateRangePartitionFilterGenerator.START_DATE, sDate);
+    properties.put(DateRangePartitionFilterGenerator.END_DATE, eDate);
+
+    PartitionFilterGenerator filterImpl = 
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+        DateRangePartitionFilterGenerator.class.getName(), properties);
 
-    Assert.assertEquals(new 
LookbackPartitionFilterGenerator(properties).getFilter(null), expected);
+    Assert.assertEquals(filterImpl.getFilter(null), expected);
   }
 
 }
diff --git 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
index f354cd8..92181b6 100644
--- 
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
+++ 
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.data.management.copy.hive.filter;
 
 import java.util.Properties;
 
+import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeUtils;
 import org.testng.Assert;
@@ -42,6 +44,12 @@ public class LookbackPartitionFilterGeneratorTest {
   }
 
   @Test
+  public void testInitialization() {
+    PartitionFilterGenerator filter = 
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+              LookbackPartitionFilterGenerator.class.getName(), 
System.getProperties());
+    Assert.assertTrue(filter instanceof LookbackPartitionFilterGenerator);
+  }
+  @Test
   public void test() {
     doTest("datePartition", "P1D", "YYYY-MM-dd-HH", "datePartition >= 
\"2016-03-14-10\"");
     doTest("datePartition", "P2D", "YYYY-MM-dd-HH", "datePartition >= 
\"2016-03-13-10\"");
@@ -55,7 +63,10 @@ public class LookbackPartitionFilterGeneratorTest {
     properties.put(LookbackPartitionFilterGenerator.LOOKBACK, lookback);
     properties.put(LookbackPartitionFilterGenerator.DATETIME_FORMAT, format);
 
-    Assert.assertEquals(new 
LookbackPartitionFilterGenerator(properties).getFilter(null), expected);
+    PartitionFilterGenerator filterImpl = 
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+        LookbackPartitionFilterGenerator.class.getName(), properties);
+
+    Assert.assertEquals(filterImpl.getFilter(null), expected);
   }
 
 }

Reply via email to