Repository: incubator-gobblin Updated Branches: refs/heads/master bfc250fac -> 695863e82
[GOBBLIN-343] Fix for db/table regexp in HiveRegistrationPolicyBase Closes #2199 from treff7es/table_regexp_fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/695863e8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/695863e8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/695863e8 Branch: refs/heads/master Commit: 695863e82cb1bf9c8cb0ca0820b05ede189fbd7b Parents: bfc250f Author: treff7es <[email protected]> Authored: Tue Dec 12 01:28:38 2017 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Dec 12 01:28:38 2017 -0800 ---------------------------------------------------------------------- .../hive/policy/HiveRegistrationPolicyBase.java | 30 +++++---- .../policy/HiveRegistrationPolicyBaseTest.java | 64 ++++++++++++++++++-- 2 files changed, 78 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/695863e8/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java index 3274fa1..d03e8b9 100644 --- a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java +++ b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java @@ -17,21 +17,11 @@ package org.apache.gobblin.hive.policy; -import com.codahale.metrics.Timer; -import com.google.common.base.Splitter; -import com.typesafe.config.Config; -import org.apache.gobblin.config.client.ConfigClient; -import org.apache.gobblin.config.client.api.VersionStabilityPolicy; -import org.apache.gobblin.hive.HiveRegister; -import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; -import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils; -import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource; import java.io.IOException; import java.net.URI; import java.util.Collection; import java.util.List; +import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.commons.lang3.StringUtils; @@ -41,20 +31,31 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.TableType; +import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.base.Strings; import com.google.common.collect.Lists; +import com.typesafe.config.Config; import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.config.client.ConfigClient; +import org.apache.gobblin.config.client.api.VersionStabilityPolicy; import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.hive.HivePartition; import org.apache.gobblin.hive.HiveRegProps; +import org.apache.gobblin.hive.HiveRegister; import org.apache.gobblin.hive.HiveSerDeManager; import org.apache.gobblin.hive.HiveTable; +import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils; import org.apache.gobblin.hive.spec.HiveSpec; import org.apache.gobblin.hive.spec.SimpleHiveSpec; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource; /** @@ -271,7 +272,12 @@ public class HiveRegistrationPolicyBase implements HiveRegistrationPolicy { if (this.props.contains(nameKey)) { name = this.props.getProp(nameKey); } else if (pattern.isPresent()) { - name = pattern.get().matcher(path.toString()).group(); + Matcher matcher = pattern.get().matcher(path.toString()); + if (matcher.matches() && matcher.groupCount() >= 1) { + name = matcher.group(1); + } else { + throw new IllegalStateException("No group match found for regexKey " + regexKey+" with regexp "+ pattern.get().toString() +" on path "+path); + } } else { throw new IllegalStateException("Missing required property " + nameKey + " or " + regexKey); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/695863e8/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java ---------------------------------------------------------------------- diff --git a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java index 6eda1d2..95de98a 100644 --- a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java +++ b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java @@ -20,27 +20,30 @@ package org.apache.gobblin.hive.policy; import java.io.IOException; import java.util.Collection; import java.util.Iterator; +import java.util.regex.Pattern; import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; +import com.google.common.base.Optional; + import org.apache.gobblin.configuration.State; import org.apache.gobblin.hive.spec.HiveSpec; import org.apache.gobblin.hive.spec.SimpleHiveSpec; - /** * Unit test for {@link HiveRegistrationPolicyBase} * * @author Ziyang Liu */ -@Test(groups = { "gobblin.hive" }) +@Test(groups = {"gobblin.hive"}) public class HiveRegistrationPolicyBaseTest { private Path path; @Test - public void testGetHiveSpecs() throws IOException { + public void testGetHiveSpecs() + throws IOException { State state = new State(); state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, "db1"); state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES, "db2"); @@ -69,7 +72,8 @@ public class HiveRegistrationPolicyBaseTest { } @Test - public void testGetHiveSpecsWithDBFilter() throws IOException{ + public void testGetHiveSpecsWithDBFilter() + throws IOException { State state = new State(); state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, "db1"); state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES, "db2"); @@ -101,6 +105,58 @@ public class HiveRegistrationPolicyBaseTest { examine(spec, "db2", "tbl5"); } + @Test + public void testTableRegexp() + throws IOException { + State state = new State(); + String regexp = ".*test_bucket/(.*)/staging/.*"; + Optional<Pattern> pattern = Optional.of(Pattern.compile(regexp)); + Path path = new Path("s3://test_bucket/topic/staging/2017-10-21/"); + + state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, regexp); + + HiveRegistrationPolicyBase registrationPolicyBase = new HiveRegistrationPolicyBase(state); + + String resultTable = registrationPolicyBase.getDatabaseOrTableName(path, HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, pattern ); + + Assert.assertEquals(resultTable, "topic"); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testTableRegexpWithoutGroupShouldFail() + throws IOException { + State state = new State(); + String regexp = ".*test_bucket/.*/staging/.*"; + Optional<Pattern> pattern = Optional.of(Pattern.compile(regexp)); + Path path = new Path("s3://test_bucket/topic/staging/2017-10-21/"); + + state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, regexp); + + HiveRegistrationPolicyBase registrationPolicyBase = new HiveRegistrationPolicyBase(state); + + String resultTable = registrationPolicyBase.getDatabaseOrTableName(path, HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, pattern ); + + Assert.assertEquals(resultTable, "topic"); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void testTableRegexpWithoutMatchShouldFail() + throws IOException { + State state = new State(); + String regexp = "^hdfs://(.*)"; + Optional<Pattern> pattern = Optional.of(Pattern.compile(regexp)); + Path path = new Path("s3://test_bucket/topic/staging/2017-10-21/"); + + state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, regexp); + + HiveRegistrationPolicyBase registrationPolicyBase = new HiveRegistrationPolicyBase(state); + + String resultTable = registrationPolicyBase.getDatabaseOrTableName(path, HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, pattern ); + + Assert.assertEquals(resultTable, "topic"); + } + + private static void examine(HiveSpec spec, String dbName, String tableName) { Assert.assertEquals(spec.getClass(), SimpleHiveSpec.class); Assert.assertEquals(spec.getTable().getDbName(), dbName);
