This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new ee17b1576 [GOBBLIN-1843] Utility for detecting non optional unions 
should convert dataset urn to hive compatible format (#3705)
ee17b1576 is described below

commit ee17b1576759c0669bf7bc46c9faf6868544ceb9
Author: Matthew Ho <[email protected]>
AuthorDate: Thu Jun 15 11:34:43 2023 -0700

    [GOBBLIN-1843] Utility for detecting non optional unions should convert 
dataset urn to hive compatible format (#3705)
    
    * [GOBBLIN-1843] Utility for detecting non optional unions should convert 
dataset urn to hive compatible format
    
    * Logs for dataset decorator and hivemetastore utils
    
    * Add util method for converting topic to table name
---
 .../apache/gobblin/compaction/hive/HiveTable.java  |  3 +-
 .../dataset/DatasetsFinderFilteringDecorator.java  |  5 ++++
 .../gobblin/hive/metastore/HiveMetaStoreUtils.java | 16 +++++++++--
 .../DatasetHiveSchemaContainsNonOptionalUnion.java |  3 +-
 ...asetHiveSchemaContainsNonOptionalUnionTest.java | 33 +++++++++++++---------
 5 files changed, 42 insertions(+), 18 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java
index 1ff182650..7880de2d2 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang.StringUtils;
 
 import com.google.common.base.Splitter;
 
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
 import org.apache.gobblin.util.HiveJdbcConnector;
 
 
@@ -42,7 +43,7 @@ public abstract class HiveTable {
   protected List<HiveAttribute> attributes;
 
   public static class Builder<T extends Builder<?>> {
-    protected String name = UUID.randomUUID().toString().replaceAll("-", "_");
+    protected String name = 
HiveMetaStoreUtils.getHiveTableName(UUID.randomUUID().toString());
     protected List<String> primaryKeys = new ArrayList<>();
     protected List<HiveAttribute> attributes = new ArrayList<>();
 
diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
index 7c7f63799..46dbdc800 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.annotations.VisibleForTesting;
 
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.gobblin.dataset.Dataset;
 import org.apache.gobblin.dataset.DatasetsFinder;
 import org.apache.gobblin.util.PropertiesUtils;
@@ -39,6 +41,7 @@ import 
org.apache.gobblin.util.function.CheckedExceptionPredicate;
 /**
  * A decorator for filtering datasets after a {@link DatasetsFinder} finds a 
{@link List} of {@link Dataset}s
  */
+@Slf4j
 public class DatasetsFinderFilteringDecorator<T extends Dataset> implements 
DatasetsFinder<T> {
   private static final String PREFIX = "filtering.datasets.finder.";
   public static final String DATASET_CLASS = PREFIX + "class";
@@ -69,6 +72,7 @@ public class DatasetsFinderFilteringDecorator<T extends 
Dataset> implements Data
   @Override
   public List<T> findDatasets() throws IOException {
     List<T> datasets = datasetFinder.findDatasets();
+    log.info("Found {} datasets", datasets.size());
     List<T> allowedDatasets = Collections.emptyList();
     try {
       allowedDatasets = datasets.parallelStream()
@@ -83,6 +87,7 @@ public class DatasetsFinderFilteringDecorator<T extends 
Dataset> implements Data
       wrappedIOException.rethrowWrapped();
     }
 
+    log.info("Allowed {}/{} datasets", allowedDatasets.size() 
,datasets.size());
     return allowedDatasets;
   }
 
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index f7dac0c08..7603dfa7e 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -29,8 +29,6 @@ import java.util.stream.Stream;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
 import org.apache.commons.lang.reflect.MethodUtils;
-import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
-import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -73,6 +71,8 @@ import org.apache.gobblin.hive.HiveRegistrationUnit;
 import org.apache.gobblin.hive.HiveRegistrationUnit.Column;
 import org.apache.gobblin.hive.HiveTable;
 import org.apache.gobblin.hive.SharedHiveConfKey;
+import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
+import org.apache.gobblin.hive.spec.HiveSpec;
 
 
 /**
@@ -151,6 +151,15 @@ public class HiveMetaStoreUtils {
     return hiveTable;
   }
 
+  /**
+   * Hive does not use '-' or '.' in the table name, so they are replaced with 
'_'
+   * @param topic
+   * @return
+   */
+  public static String getHiveTableName(String topic) {
+    return topic.replaceAll("[-.]", "_");
+  }
+
   /**
    * Convert a {@link HivePartition} into a {@link Partition}.
    */
@@ -289,7 +298,8 @@ public class HiveMetaStoreUtils {
           .anyMatch(type -> isNonOptionalUnion(type));
     }
 
-    throw new RuntimeException("Avro based Hive tables without \"" + 
HiveAvroSerDeManager.SCHEMA_LITERAL +"\" are not supported");
+    throw new RuntimeException("Avro based Hive tables without \"" + 
HiveAvroSerDeManager.SCHEMA_LITERAL +"\" are not supported. "
+        + "hiveTable=" + hiveTable.getDbName() + "." + 
hiveTable.getTableName());
   }
 
   /**
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
index 655c764c7..c74afaeec 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
@@ -76,7 +76,8 @@ public class DatasetHiveSchemaContainsNonOptionalUnion<T 
extends Dataset> implem
       throw new IllegalStateException(String.format("Dataset urn [%s] doesn't 
follow expected pattern. " +
       "Expected pattern = %s", dataset.getUrn(), pattern.pattern()));
     }
-    return new DbAndTable(m.group(1), m.group(2));
+
+    return new DbAndTable(m.group(1), 
HiveMetaStoreUtils.getHiveTableName(m.group(2)));
   }
 
   boolean containsNonOptionalUnion(HiveTable table) {
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
index 33cee4929..14041cdec 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
@@ -17,17 +17,10 @@
 
 package org.apache.gobblin.iceberg.predicates;
 
-import com.google.common.io.Files;
 import java.io.File;
 import java.util.Collections;
-import lombok.extern.slf4j.Slf4j;
+
 import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.Dataset;
-import org.apache.gobblin.dataset.test.SimpleDatasetForTesting;
-import org.apache.gobblin.hive.HiveTable;
-import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
-import org.apache.gobblin.util.ConfigUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -40,16 +33,30 @@ import org.testng.annotations.AfterSuite;
 import org.testng.annotations.BeforeSuite;
 import org.testng.annotations.Test;
 
+import com.google.common.io.Files;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.test.SimpleDatasetForTesting;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
 @Slf4j
+// depends on icebergMetadataWriterTest to avoid concurrency between other 
HiveMetastoreTest(s) in CI.
+// You can uncomment the dependsOnGroups if you want to test this class in 
isolation
 @Test(dependsOnGroups = "icebergMetadataWriterTest")
 public class DatasetHiveSchemaContainsNonOptionalUnionTest extends 
HiveMetastoreTest {
 
-  private static String dbName = "dbname_" +
-      
DatasetHiveSchemaContainsNonOptionalUnionTest.class.getSimpleName().toLowerCase();
+  private static String dbName = "dbName";
   private static File tmpDir;
   private static State state;
   private static String dbUri;
-  private static String testTable = "test_table";
+  private static String testTable = "test_table01";
+  private static String datasetUrn = 
String.format("/data/%s/streaming/test-Table01/hourly/2023/01/01", dbName);
 
   @AfterSuite
   public void clean() throws Exception {
@@ -77,14 +84,14 @@ public class DatasetHiveSchemaContainsNonOptionalUnionTest 
extends HiveMetastore
     metastoreClient.createTable(HiveMetaStoreUtils.getTable(testTable));
 
     state = 
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
-    state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN, 
"/data/(\\w+)/(\\w+)");
+    state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN, 
"/data/(\\w+)/.*/([\\w\\d_-]+)/hourly.*");
     Assert.assertNotNull(metastoreClient.getTable(dbName, 
DatasetHiveSchemaContainsNonOptionalUnionTest.testTable));
   }
 
   @Test
   public void testContainsNonOptionalUnion() throws Exception {
     DatasetHiveSchemaContainsNonOptionalUnion predicate = new 
DatasetHiveSchemaContainsNonOptionalUnion(state.getProperties());
-    Dataset dataset = new SimpleDatasetForTesting("/data/" + dbName + "/" + 
testTable);
+    Dataset dataset = new SimpleDatasetForTesting(datasetUrn);
     Assert.assertTrue(predicate.test(dataset));
   }
 

Reply via email to