This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ee17b1576 [GOBBLIN-1843] Utility for detecting non optional unions
should convert dataset urn to hive compatible format (#3705)
ee17b1576 is described below
commit ee17b1576759c0669bf7bc46c9faf6868544ceb9
Author: Matthew Ho <[email protected]>
AuthorDate: Thu Jun 15 11:34:43 2023 -0700
[GOBBLIN-1843] Utility for detecting non optional unions should convert
dataset urn to hive compatible format (#3705)
* [GOBBLIN-1843] Utility for detecting non optional unions should convert
dataset urn to hive compatible format
* Logs for dataset decorator and hivemetastore utils
* Add util method for converting topic to table name
---
.../apache/gobblin/compaction/hive/HiveTable.java | 3 +-
.../dataset/DatasetsFinderFilteringDecorator.java | 5 ++++
.../gobblin/hive/metastore/HiveMetaStoreUtils.java | 16 +++++++++--
.../DatasetHiveSchemaContainsNonOptionalUnion.java | 3 +-
...asetHiveSchemaContainsNonOptionalUnionTest.java | 33 +++++++++++++---------
5 files changed, 42 insertions(+), 18 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java
index 1ff182650..7880de2d2 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/hive/HiveTable.java
@@ -27,6 +27,7 @@ import org.apache.commons.lang.StringUtils;
import com.google.common.base.Splitter;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.util.HiveJdbcConnector;
@@ -42,7 +43,7 @@ public abstract class HiveTable {
protected List<HiveAttribute> attributes;
public static class Builder<T extends Builder<?>> {
- protected String name = UUID.randomUUID().toString().replaceAll("-", "_");
+ protected String name =
HiveMetaStoreUtils.getHiveTableName(UUID.randomUUID().toString());
protected List<String> primaryKeys = new ArrayList<>();
protected List<HiveAttribute> attributes = new ArrayList<>();
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
index 7c7f63799..46dbdc800 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/dataset/DatasetsFinderFilteringDecorator.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.fs.Path;
import com.google.common.annotations.VisibleForTesting;
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.DatasetsFinder;
import org.apache.gobblin.util.PropertiesUtils;
@@ -39,6 +41,7 @@ import
org.apache.gobblin.util.function.CheckedExceptionPredicate;
/**
* A decorator for filtering datasets after a {@link DatasetsFinder} finds a
{@link List} of {@link Dataset}s
*/
+@Slf4j
public class DatasetsFinderFilteringDecorator<T extends Dataset> implements
DatasetsFinder<T> {
private static final String PREFIX = "filtering.datasets.finder.";
public static final String DATASET_CLASS = PREFIX + "class";
@@ -69,6 +72,7 @@ public class DatasetsFinderFilteringDecorator<T extends
Dataset> implements Data
@Override
public List<T> findDatasets() throws IOException {
List<T> datasets = datasetFinder.findDatasets();
+ log.info("Found {} datasets", datasets.size());
List<T> allowedDatasets = Collections.emptyList();
try {
allowedDatasets = datasets.parallelStream()
@@ -83,6 +87,7 @@ public class DatasetsFinderFilteringDecorator<T extends
Dataset> implements Data
wrappedIOException.rethrowWrapped();
}
+ log.info("Allowed {}/{} datasets", allowedDatasets.size()
,datasets.size());
return allowedDatasets;
}
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
index f7dac0c08..7603dfa7e 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtils.java
@@ -29,8 +29,6 @@ import java.util.stream.Stream;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.commons.lang.reflect.MethodUtils;
-import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
-import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -73,6 +71,8 @@ import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveRegistrationUnit.Column;
import org.apache.gobblin.hive.HiveTable;
import org.apache.gobblin.hive.SharedHiveConfKey;
+import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
+import org.apache.gobblin.hive.spec.HiveSpec;
/**
@@ -151,6 +151,15 @@ public class HiveMetaStoreUtils {
return hiveTable;
}
+ /**
+ * Hive does not use '-' or '.' in the table name, so they are replaced with
'_'
+ * @param topic
+ * @return
+ */
+ public static String getHiveTableName(String topic) {
+ return topic.replaceAll("[-.]", "_");
+ }
+
/**
* Convert a {@link HivePartition} into a {@link Partition}.
*/
@@ -289,7 +298,8 @@ public class HiveMetaStoreUtils {
.anyMatch(type -> isNonOptionalUnion(type));
}
- throw new RuntimeException("Avro based Hive tables without \"" +
HiveAvroSerDeManager.SCHEMA_LITERAL +"\" are not supported");
+ throw new RuntimeException("Avro based Hive tables without \"" +
HiveAvroSerDeManager.SCHEMA_LITERAL +"\" are not supported. "
+ + "hiveTable=" + hiveTable.getDbName() + "." +
hiveTable.getTableName());
}
/**
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
index 655c764c7..c74afaeec 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
@@ -76,7 +76,8 @@ public class DatasetHiveSchemaContainsNonOptionalUnion<T
extends Dataset> implem
throw new IllegalStateException(String.format("Dataset urn [%s] doesn't
follow expected pattern. " +
"Expected pattern = %s", dataset.getUrn(), pattern.pattern()));
}
- return new DbAndTable(m.group(1), m.group(2));
+
+ return new DbAndTable(m.group(1),
HiveMetaStoreUtils.getHiveTableName(m.group(2)));
}
boolean containsNonOptionalUnion(HiveTable table) {
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
index 33cee4929..14041cdec 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
@@ -17,17 +17,10 @@
package org.apache.gobblin.iceberg.predicates;
-import com.google.common.io.Files;
import java.io.File;
import java.util.Collections;
-import lombok.extern.slf4j.Slf4j;
+
import org.apache.commons.io.FileUtils;
-import org.apache.gobblin.configuration.State;
-import org.apache.gobblin.dataset.Dataset;
-import org.apache.gobblin.dataset.test.SimpleDatasetForTesting;
-import org.apache.gobblin.hive.HiveTable;
-import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
-import org.apache.gobblin.util.ConfigUtils;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -40,16 +33,30 @@ import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;
+import com.google.common.io.Files;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.dataset.Dataset;
+import org.apache.gobblin.dataset.test.SimpleDatasetForTesting;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
+import org.apache.gobblin.util.ConfigUtils;
+
+
@Slf4j
+// depends on icebergMetadataWriterTest to avoid concurrency between other
HiveMetastoreTest(s) in CI.
+// You can uncomment the dependsOnGroups if you want to test this class in
isolation
@Test(dependsOnGroups = "icebergMetadataWriterTest")
public class DatasetHiveSchemaContainsNonOptionalUnionTest extends
HiveMetastoreTest {
- private static String dbName = "dbname_" +
-
DatasetHiveSchemaContainsNonOptionalUnionTest.class.getSimpleName().toLowerCase();
+ private static String dbName = "dbName";
private static File tmpDir;
private static State state;
private static String dbUri;
- private static String testTable = "test_table";
+ private static String testTable = "test_table01";
+ private static String datasetUrn =
String.format("/data/%s/streaming/test-Table01/hourly/2023/01/01", dbName);
@AfterSuite
public void clean() throws Exception {
@@ -77,14 +84,14 @@ public class DatasetHiveSchemaContainsNonOptionalUnionTest
extends HiveMetastore
metastoreClient.createTable(HiveMetaStoreUtils.getTable(testTable));
state =
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
- state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN,
"/data/(\\w+)/(\\w+)");
+ state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN,
"/data/(\\w+)/.*/([\\w\\d_-]+)/hourly.*");
Assert.assertNotNull(metastoreClient.getTable(dbName,
DatasetHiveSchemaContainsNonOptionalUnionTest.testTable));
}
@Test
public void testContainsNonOptionalUnion() throws Exception {
DatasetHiveSchemaContainsNonOptionalUnion predicate = new
DatasetHiveSchemaContainsNonOptionalUnion(state.getProperties());
- Dataset dataset = new SimpleDatasetForTesting("/data/" + dbName + "/" +
testTable);
+ Dataset dataset = new SimpleDatasetForTesting(datasetUrn);
Assert.assertTrue(predicate.test(dataset));
}