This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new afb6a7c [GOBBLIN-1054] Refactor HiveSource to make partition filter
extensible[]
afb6a7c is described below
commit afb6a7c48b692ccce653637d877bf07241a59661
Author: Kumar Kandasami <[email protected]>
AuthorDate: Thu Feb 27 18:12:41 2020 -0800
[GOBBLIN-1054] Refactor HiveSource to make partition filter extensible[]
Closes #2894 from KumaravelKandasami/master
---
.../conversion/hive/source/HiveSource.java | 20 +++---
.../filter/DateRangePartitionFilterGenerator.java | 79 ++++++++++++++++++++++
.../filter/LookbackPartitionFilterGenerator.java | 43 +++++++-----
... => DateRangePartitionFilterGeneratorTest.java} | 31 ++++++---
.../LookbackPartitionFilterGeneratorTest.java | 13 +++-
5 files changed, 146 insertions(+), 40 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index eb16d59..cf1477a 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -61,6 +61,7 @@ import
org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionL
import org.apache.gobblin.data.management.copy.hive.HiveDataset;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.data.management.copy.hive.HiveUtils;
+import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
import
org.apache.gobblin.data.management.copy.hive.filter.LookbackPartitionFilterGenerator;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.instrumented.Instrumented;
@@ -112,6 +113,9 @@ public class HiveSource implements Source {
public static final String HIVE_SOURCE_DATASET_FINDER_CLASS_KEY =
"hive.dataset.finder.class";
public static final String DEFAULT_HIVE_SOURCE_DATASET_FINDER_CLASS =
HiveDatasetFinder.class.getName();
+ public static final String HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_KEY =
"hive.dataset.finder.partitionfilter.class";
+ public static final String
DEFAULT_HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_CLASS =
LookbackPartitionFilterGenerator.class.getName();
+
public static final String DISTCP_REGISTRATION_GENERATION_TIME_KEY =
"registrationGenerationTimeMillis";
public static final String HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY =
"hive.source.watermarker.factoryClass";
public static final String DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS =
PartitionLevelWatermarker.Factory.class.getName();
@@ -147,6 +151,7 @@ public class HiveSource implements Source {
protected HiveSourceWatermarker watermarker;
protected IterableDatasetFinder<HiveDataset> datasetFinder;
protected List<WorkUnit> workunits;
+ protected PartitionFilterGenerator partitionFilterGenerator;
protected long maxLookBackTime;
protected long beginGetWorkunitsTime;
protected List<String> ignoreDataPathIdentifierList;
@@ -216,6 +221,9 @@ public class HiveSource implements Source {
this.maxLookBackTime = new
DateTime().minusDays(maxLookBackDays).getMillis();
this.ignoreDataPathIdentifierList =
COMMA_BASED_SPLITTER.splitToList(state.getProp(HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER_KEY,
DEFAULT_HIVE_SOURCE_IGNORE_DATA_PATH_IDENTIFIER));
+ this.partitionFilterGenerator =
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+ state.getProp(HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_KEY,
+ DEFAULT_HIVE_SOURCE_DATASET_FINDER_PARTITION_FILTER_CLASS),
state.getProperties());
silenceHiveLoggers();
}
@@ -300,17 +308,7 @@ public class HiveSource implements Source {
long tableProcessTime = new DateTime().getMillis();
this.watermarker.onTableProcessBegin(hiveDataset.getTable(),
tableProcessTime);
- Optional<String> partitionFilter = Optional.absent();
-
- // If the table is date partitioned, use the partition name to filter
partitions older than lookback
- if
(hiveDataset.getProperties().containsKey(LookbackPartitionFilterGenerator.PARTITION_COLUMN)
- &&
hiveDataset.getProperties().containsKey(LookbackPartitionFilterGenerator.DATETIME_FORMAT)
- &&
hiveDataset.getProperties().containsKey(LookbackPartitionFilterGenerator.LOOKBACK))
{
- partitionFilter =
- Optional.of(new
LookbackPartitionFilterGenerator(hiveDataset.getProperties()).getFilter(hiveDataset));
- log.info(String.format("Getting partitions for %s using partition filter
%s", hiveDataset.getTable()
- .getCompleteName(), partitionFilter.get()));
- }
+ Optional<String> partitionFilter =
Optional.fromNullable(this.partitionFilterGenerator.getFilter(hiveDataset));
List<Partition> sourcePartitions = HiveUtils.getPartitions(client.get(),
hiveDataset.getTable(), partitionFilter);
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGenerator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGenerator.java
new file mode 100644
index 0000000..395b40c
--- /dev/null
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGenerator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.copy.hive.filter;
+
+import java.util.Arrays;
+import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+
+
+/**
+ * Filters hive partitions using BETWEEN START AND END date range.
+ * Requires PARTITION_COLUMN, START_DATE, END_DATE
+ *
+ * <p>
+ * The generated filter is of the form "datePartition between 'start_date'
and 'end_date' ".
+ * </p>
+ */
+@Slf4j
+public class DateRangePartitionFilterGenerator implements
PartitionFilterGenerator {
+
+ public static final String PARTITION_COLUMN =
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".partition.filter.datetime.column";
+ public static final String START_DATE =
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".partition.filter.datetime.startdate";
+ public static final String END_DATE = HiveDatasetFinder.HIVE_DATASET_PREFIX
+ ".partition.filter.datetime.enddate";
+
+ private final Properties prop;
+
+ public DateRangePartitionFilterGenerator(Properties properties) {
+ this.prop = (properties == null) ? System.getProperties(): properties;
+ }
+
+ @Override
+ public String getFilter(HiveDataset hiveDataset) {
+
+ if (isValidConfig()) {
+ String partitionColumn = this.prop.getProperty(PARTITION_COLUMN);
+ String startDate = this.prop.getProperty(START_DATE);
+ String endDate = this.prop.getProperty(END_DATE);
+
+ String partitionFilter =String.format("%s between \"%s\" and \"%s\"",
partitionColumn, startDate, endDate);
+
+ log.info(String.format("Getting partitions for %s using partition filter
%s", ((hiveDataset == null) ? "null" : hiveDataset.getTable()
+ .getCompleteName()), partitionFilter));
+ return partitionFilter;
+ } else {
+ log.error(DateRangePartitionFilterGenerator.class.getName()
+ + " requires the following properties " + Arrays.toString(new
String[]{PARTITION_COLUMN, START_DATE, END_DATE}));
+
+ return null;
+ }
+ }
+
+ private boolean isValidConfig() {
+ return
this.prop.containsKey(DateRangePartitionFilterGenerator.PARTITION_COLUMN)
+ && this.prop.containsKey(DateRangePartitionFilterGenerator.START_DATE)
+ && this.prop.containsKey(DateRangePartitionFilterGenerator.END_DATE);
+ }
+}
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGenerator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGenerator.java
index 6c80940..d271526 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGenerator.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGenerator.java
@@ -20,13 +20,12 @@ package org.apache.gobblin.data.management.copy.hive.filter;
import java.util.Arrays;
import java.util.Properties;
+import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
-import com.google.common.base.Preconditions;
-
import org.apache.gobblin.data.management.copy.hive.HiveDataset;
import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
@@ -41,34 +40,44 @@ import
org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
* must be such that lexycographical string and date ordering are compatible.
* </p>
*/
+@Slf4j
public class LookbackPartitionFilterGenerator implements
PartitionFilterGenerator {
public static final String PARTITION_COLUMN =
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".partition.filter.datetime.column";
public static final String LOOKBACK = HiveDatasetFinder.HIVE_DATASET_PREFIX
+ ".partition.filter.datetime.lookback";
public static final String DATETIME_FORMAT =
HiveDatasetFinder.HIVE_DATASET_PREFIX + ".partition.filter.datetime.format";
- private static final String ERROR_MESSAGE =
LookbackPartitionFilterGenerator.class.getName()
- + " requires the following properties " + Arrays.toString(new
String[]{PARTITION_COLUMN, LOOKBACK, DATETIME_FORMAT});
-
- private final String partitionColumn;
- private final Period lookback;
- private final DateTimeFormatter formatter;
+ private final Properties prop;
public LookbackPartitionFilterGenerator(Properties properties) {
- Preconditions.checkArgument(properties.containsKey(PARTITION_COLUMN),
ERROR_MESSAGE);
- Preconditions.checkArgument(properties.containsKey(LOOKBACK),
ERROR_MESSAGE);
- Preconditions.checkArgument(properties.containsKey(DATETIME_FORMAT),
ERROR_MESSAGE);
-
- this.partitionColumn = properties.getProperty(PARTITION_COLUMN);
- this.lookback = Period.parse(properties.getProperty(LOOKBACK));
- this.formatter =
DateTimeFormat.forPattern(properties.getProperty(DATETIME_FORMAT));
+ this.prop = (properties == null) ? System.getProperties(): properties;
}
@Override
public String getFilter(HiveDataset hiveDataset) {
- DateTime limitDate = (new DateTime()).minus(this.lookback);
+ if (isValidConfig()) {
+ String partitionColumn = this.prop.getProperty(PARTITION_COLUMN);
+ Period lookback = Period.parse(this.prop.getProperty(LOOKBACK));
+ DateTimeFormatter formatter =
DateTimeFormat.forPattern(this.prop.getProperty(DATETIME_FORMAT));
+
+ DateTime limitDate = (new DateTime()).minus(lookback);
+
+ String partitionFilter = String.format("%s >= \"%s\"", partitionColumn,
formatter.print(limitDate));
+ log.info(String.format("Getting partitions for %s using partition filter
%s", ((hiveDataset == null) ? "null" : hiveDataset.getTable()
+ .getCompleteName()), partitionFilter));
+ return partitionFilter;
+ } else {
+ log.error(LookbackPartitionFilterGenerator.class.getName()
+ + " requires the following properties " + Arrays.toString(new
String[]{PARTITION_COLUMN, LOOKBACK, DATETIME_FORMAT}));
+
+ return null;
+ }
+ }
- return String.format("%s >= \"%s\"", this.partitionColumn,
this.formatter.print(limitDate));
+ private boolean isValidConfig() {
+ return
this.prop.containsKey(LookbackPartitionFilterGenerator.PARTITION_COLUMN)
+ &&
this.prop.containsKey(LookbackPartitionFilterGenerator.DATETIME_FORMAT)
+ && this.prop.containsKey(LookbackPartitionFilterGenerator.LOOKBACK);
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGeneratorTest.java
similarity index 55%
copy from
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
copy to
gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGeneratorTest.java
index f354cd8..cd7ec43 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/DateRangePartitionFilterGeneratorTest.java
@@ -18,7 +18,8 @@
package org.apache.gobblin.data.management.copy.hive.filter;
import java.util.Properties;
-
+import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.testng.Assert;
@@ -26,8 +27,9 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+
@Test(groups = { "SystemTimeTests"})
-public class LookbackPartitionFilterGeneratorTest {
+public class DateRangePartitionFilterGeneratorTest {
@BeforeMethod
public void setUp()
@@ -42,20 +44,27 @@ public class LookbackPartitionFilterGeneratorTest {
}
@Test
+ public void testInitialization() {
+ PartitionFilterGenerator filter =
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+ DateRangePartitionFilterGenerator.class.getName(),
System.getProperties());
+ Assert.assertTrue(filter instanceof DateRangePartitionFilterGenerator);
+ }
+ @Test
public void test() {
- doTest("datePartition", "P1D", "YYYY-MM-dd-HH", "datePartition >=
\"2016-03-14-10\"");
- doTest("datePartition", "P2D", "YYYY-MM-dd-HH", "datePartition >=
\"2016-03-13-10\"");
- doTest("datePartition", "PT4H", "YYYY-MM-dd-HH", "datePartition >=
\"2016-03-15-06\"");
- doTest("myColumn", "PT4H", "YYYY-MM-dd-HH", "myColumn >=
\"2016-03-15-06\"");
+ doTest("datePartition", "2020-01-01", "2020-01-10", "datePartition between
\"2020-01-01\" and \"2020-01-10\"");
}
- private void doTest(String column, String lookback, String format, String
expected) {
+
+ private void doTest(String column, String sDate, String eDate, String
expected) {
Properties properties = new Properties();
- properties.put(LookbackPartitionFilterGenerator.PARTITION_COLUMN, column);
- properties.put(LookbackPartitionFilterGenerator.LOOKBACK, lookback);
- properties.put(LookbackPartitionFilterGenerator.DATETIME_FORMAT, format);
+ properties.put(DateRangePartitionFilterGenerator.PARTITION_COLUMN, column);
+ properties.put(DateRangePartitionFilterGenerator.START_DATE, sDate);
+ properties.put(DateRangePartitionFilterGenerator.END_DATE, eDate);
+
+ PartitionFilterGenerator filterImpl =
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+ DateRangePartitionFilterGenerator.class.getName(), properties);
- Assert.assertEquals(new
LookbackPartitionFilterGenerator(properties).getFilter(null), expected);
+ Assert.assertEquals(filterImpl.getFilter(null), expected);
}
}
diff --git
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
index f354cd8..92181b6 100644
---
a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
+++
b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/hive/filter/LookbackPartitionFilterGeneratorTest.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.data.management.copy.hive.filter;
import java.util.Properties;
+import org.apache.gobblin.data.management.copy.hive.PartitionFilterGenerator;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeUtils;
import org.testng.Assert;
@@ -42,6 +44,12 @@ public class LookbackPartitionFilterGeneratorTest {
}
@Test
+ public void testInitialization() {
+ PartitionFilterGenerator filter =
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+ LookbackPartitionFilterGenerator.class.getName(),
System.getProperties());
+ Assert.assertTrue(filter instanceof LookbackPartitionFilterGenerator);
+ }
+ @Test
public void test() {
doTest("datePartition", "P1D", "YYYY-MM-dd-HH", "datePartition >=
\"2016-03-14-10\"");
doTest("datePartition", "P2D", "YYYY-MM-dd-HH", "datePartition >=
\"2016-03-13-10\"");
@@ -55,7 +63,10 @@ public class LookbackPartitionFilterGeneratorTest {
properties.put(LookbackPartitionFilterGenerator.LOOKBACK, lookback);
properties.put(LookbackPartitionFilterGenerator.DATETIME_FORMAT, format);
- Assert.assertEquals(new
LookbackPartitionFilterGenerator(properties).getFilter(null), expected);
+ PartitionFilterGenerator filterImpl =
GobblinConstructorUtils.invokeConstructor(PartitionFilterGenerator.class,
+ LookbackPartitionFilterGenerator.class.getName(), properties);
+
+ Assert.assertEquals(filterImpl.getFilter(null), expected);
}
}