Repository: incubator-gobblin
Updated Branches:
  refs/heads/master bfc250fac -> 695863e82


[GOBBLIN-343] Fix for db/table regexp in HiveRegistrationPolicyBase

Closes #2199 from treff7es/table_regexp_fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/695863e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/695863e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/695863e8

Branch: refs/heads/master
Commit: 695863e82cb1bf9c8cb0ca0820b05ede189fbd7b
Parents: bfc250f
Author: treff7es <[email protected]>
Authored: Tue Dec 12 01:28:38 2017 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Tue Dec 12 01:28:38 2017 -0800

----------------------------------------------------------------------
 .../hive/policy/HiveRegistrationPolicyBase.java | 30 +++++----
 .../policy/HiveRegistrationPolicyBaseTest.java  | 64 ++++++++++++++++++--
 2 files changed, 78 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/695863e8/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
----------------------------------------------------------------------
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
index 3274fa1..d03e8b9 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBase.java
@@ -17,21 +17,11 @@
 
 package org.apache.gobblin.hive.policy;
 
-import com.codahale.metrics.Timer;
-import com.google.common.base.Splitter;
-import com.typesafe.config.Config;
-import org.apache.gobblin.config.client.ConfigClient;
-import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
-import org.apache.gobblin.hive.HiveRegister;
-import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
-import org.apache.gobblin.instrumented.Instrumented;
-import org.apache.gobblin.metrics.MetricContext;
-import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils;
-import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Collection;
 import java.util.List;
+import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.StringUtils;
@@ -41,20 +31,31 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.TableType;
 
+import com.codahale.metrics.Timer;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.config.client.ConfigClient;
+import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.HivePartition;
 import org.apache.gobblin.hive.HiveRegProps;
+import org.apache.gobblin.hive.HiveRegister;
 import org.apache.gobblin.hive.HiveSerDeManager;
 import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
 import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.hive.spec.SimpleHiveSpec;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.source.extractor.extract.kafka.ConfigStoreUtils;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
 
 
 /**
@@ -271,7 +272,12 @@ public class HiveRegistrationPolicyBase implements 
HiveRegistrationPolicy {
     if (this.props.contains(nameKey)) {
       name = this.props.getProp(nameKey);
     } else if (pattern.isPresent()) {
-      name = pattern.get().matcher(path.toString()).group();
+      Matcher matcher = pattern.get().matcher(path.toString());
+      if (matcher.matches() && matcher.groupCount() >= 1) {
+        name = matcher.group(1);
+      } else {
+        throw new IllegalStateException("No group match found for regexKey " + 
regexKey+" with regexp "+ pattern.get().toString() +" on path "+path);
+      }
     } else {
       throw new IllegalStateException("Missing required property " + nameKey + 
" or " + regexKey);
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/695863e8/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
index 6eda1d2..95de98a 100644
--- 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
+++ 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/policy/HiveRegistrationPolicyBaseTest.java
@@ -20,27 +20,30 @@ package org.apache.gobblin.hive.policy;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.regex.Pattern;
 
 import org.apache.hadoop.fs.Path;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import com.google.common.base.Optional;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.hive.spec.SimpleHiveSpec;
 
-
 /**
  * Unit test for {@link HiveRegistrationPolicyBase}
  *
  * @author Ziyang Liu
  */
-@Test(groups = { "gobblin.hive" })
+@Test(groups = {"gobblin.hive"})
 public class HiveRegistrationPolicyBaseTest {
   private Path path;
 
   @Test
-  public void testGetHiveSpecs() throws IOException {
+  public void testGetHiveSpecs()
+      throws IOException {
     State state = new State();
     state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, 
"db1");
     
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
 "db2");
@@ -69,7 +72,8 @@ public class HiveRegistrationPolicyBaseTest {
   }
 
   @Test
-  public void testGetHiveSpecsWithDBFilter() throws IOException{
+  public void testGetHiveSpecsWithDBFilter()
+      throws IOException {
     State state = new State();
     state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, 
"db1");
     
state.appendToListProp(HiveRegistrationPolicyBase.ADDITIONAL_HIVE_DATABASE_NAMES,
 "db2");
@@ -101,6 +105,58 @@ public class HiveRegistrationPolicyBaseTest {
     examine(spec, "db2", "tbl5");
   }
 
+  @Test
+  public void testTableRegexp()
+      throws IOException {
+    State state = new State();
+    String regexp = ".*test_bucket/(.*)/staging/.*";
+    Optional<Pattern> pattern = Optional.of(Pattern.compile(regexp));
+    Path path = new Path("s3://test_bucket/topic/staging/2017-10-21/");
+
+    state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, 
regexp);
+
+    HiveRegistrationPolicyBase registrationPolicyBase = new 
HiveRegistrationPolicyBase(state);
+
+    String resultTable = registrationPolicyBase.getDatabaseOrTableName(path, 
HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, 
HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, pattern );
+
+    Assert.assertEquals(resultTable, "topic");
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testTableRegexpWithoutGroupShouldFail()
+      throws IOException {
+    State state = new State();
+    String regexp = ".*test_bucket/.*/staging/.*";
+    Optional<Pattern> pattern = Optional.of(Pattern.compile(regexp));
+    Path path = new Path("s3://test_bucket/topic/staging/2017-10-21/");
+
+    state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, 
regexp);
+
+    HiveRegistrationPolicyBase registrationPolicyBase = new 
HiveRegistrationPolicyBase(state);
+
+    String resultTable = registrationPolicyBase.getDatabaseOrTableName(path, 
HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, 
HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, pattern );
+
+    Assert.assertEquals(resultTable, "topic");
+  }
+
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testTableRegexpWithoutMatchShouldFail()
+      throws IOException {
+    State state = new State();
+    String regexp = "^hdfs://(.*)";
+    Optional<Pattern> pattern = Optional.of(Pattern.compile(regexp));
+    Path path = new Path("s3://test_bucket/topic/staging/2017-10-21/");
+
+    state.appendToListProp(HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, 
regexp);
+
+    HiveRegistrationPolicyBase registrationPolicyBase = new 
HiveRegistrationPolicyBase(state);
+
+    String resultTable = registrationPolicyBase.getDatabaseOrTableName(path, 
HiveRegistrationPolicyBase.HIVE_DATABASE_NAME, 
HiveRegistrationPolicyBase.HIVE_DATABASE_REGEX, pattern );
+
+    Assert.assertEquals(resultTable, "topic");
+  }
+
+
   private static void examine(HiveSpec spec, String dbName, String tableName) {
     Assert.assertEquals(spec.getClass(), SimpleHiveSpec.class);
     Assert.assertEquals(spec.getTable().getDbName(), dbName);

Reply via email to