This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 78c8891402 [GOBBLIN-2087] Enhance
DatasetHiveSchemaContainsNonOptionalUnion to suport Optional database name
(#3972)
78c8891402 is described below
commit 78c889140266d27bf5826e5c5b78b045b0bcb1f1
Author: pawanbtej <[email protected]>
AuthorDate: Tue Jul 30 10:16:47 2024 -0700
[GOBBLIN-2087] Enhance DatasetHiveSchemaContainsNonOptionalUnion to suport
Optional database name (#3972)
* GOBBLIN-2087 Enhance DatasetHiveSchemaContainsNonOptionalUnion to support
Optional database name
---------
Co-authored-by: preddy_LinkedIn <[email protected]>
---
.../DatasetHiveSchemaContainsNonOptionalUnion.java | 22 ++++++++++-
...asetHiveSchemaContainsNonOptionalUnionTest.java | 45 ++++++++++++++++++++--
2 files changed, 62 insertions(+), 5 deletions(-)
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
index c74afaeec2..391ca553d2 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
@@ -42,16 +42,20 @@ import
org.apache.gobblin.util.function.CheckedExceptionPredicate;
public class DatasetHiveSchemaContainsNonOptionalUnion<T extends Dataset>
implements CheckedExceptionPredicate<T, IOException> {
private final HiveRegister hiveRegister;
private final Pattern pattern;
+ private final Optional<String> optionalDbName;
+
public static final String PREFIX =
DatasetHiveSchemaContainsNonOptionalUnion.class.getName();
/**
* 1st match group is assumed to be the DB and the 2nd match group the Table
for the pattern
*/
public static final String PATTERN = PREFIX + ".db.table.pattern";
+ public static final String OPTIONAL_DB_NAME = PREFIX + ".db.optionalDbName";
public DatasetHiveSchemaContainsNonOptionalUnion(Properties properties) {
this.hiveRegister = getHiveRegister(new State(properties));
this.pattern = Pattern.compile(properties.getProperty(PATTERN));
+ this.optionalDbName =
Optional.fromNullable(properties.getProperty(OPTIONAL_DB_NAME));
}
@Override
@@ -67,7 +71,15 @@ public class DatasetHiveSchemaContainsNonOptionalUnion<T
extends Dataset> implem
private Optional<HiveTable> getTable(T dataset) throws IOException {
DbAndTable dbAndTable = getDbAndTable(dataset);
- return this.hiveRegister.getTable(dbAndTable.getDb(),
dbAndTable.getTable());
+ log.info("Checking for table in DB: {} and Table: {}", dbAndTable.getDb(),
dbAndTable.getTable());
+ Optional<HiveTable> hiveTable =
this.hiveRegister.getTable(dbAndTable.getDb(), dbAndTable.getTable());
+
+ if (hiveTable.isPresent()) {
+ log.info("Table found in DB: {} and Table: {}. Exiting execution.",
dbAndTable.getDb(), dbAndTable.getTable());
+ } else {
+ log.info("No table found in DB: {} and Table: {}.", dbAndTable.getDb(),
dbAndTable.getTable());
+ }
+ return hiveTable;
}
private DbAndTable getDbAndTable(T dataset) {
@@ -77,7 +89,13 @@ public class DatasetHiveSchemaContainsNonOptionalUnion<T
extends Dataset> implem
"Expected pattern = %s", dataset.getUrn(), pattern.pattern()));
}
- return new DbAndTable(m.group(1),
HiveMetaStoreUtils.getHiveTableName(m.group(2)));
+ String db = optionalDbName.or(m.group(1));
+ if (optionalDbName.isPresent()) {
+ log.info("DB name from pattern: {}. Replacing with provided DB name:
{}", m.group(1), optionalDbName.get());
+ }
+
+ String table = HiveMetaStoreUtils.getHiveTableName(m.group(2));
+ return new DbAndTable(db, table);
}
boolean containsNonOptionalUnion(HiveTable table) {
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
index 5436b5e6d8..a0608e64ed 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
@@ -56,8 +56,12 @@ public class DatasetHiveSchemaContainsNonOptionalUnionTest
extends HiveMetastore
private static File tmpDir;
private static State state;
private static String dbUri;
+
+ private static String dbOptionalUri;
+
private static String testTable = "test_table01";
private static String datasetUrn =
String.format("/data/%s/streaming/test-Table01/hourly/2023/01/01", dbName);
+ private static String optionalDbName = "test_hourly";
@AfterSuite
public void clean() throws Exception {
@@ -84,9 +88,22 @@ public class DatasetHiveSchemaContainsNonOptionalUnionTest
extends HiveMetastore
final String avroSchema = "{\"type\": \"record\", \"name\":
\"TestEvent\",\"namespace\": \"test.namespace\", \"fields\": "
+ "[{\"name\":\"fieldName\", \"type\": %s}]}";
serdeProps.setProp("avro.schema.literal", String.format(avroSchema,
"[\"string\", \"int\"]"));
- HiveTable testTable = createTestHiveTable_Avro(serdeProps);
+ // Create table in main database
+ HiveTable testTable = createTestHiveTable_Avro(dbName, serdeProps);
metastoreClient.createTable(HiveMetaStoreUtils.getTable(testTable));
+ // Create optional database
+ dbOptionalUri = String.format("%s/%s/%s",
tmpDir.getAbsolutePath(),"metastore", optionalDbName);
+ try {
+ metastoreClient.getDatabase(optionalDbName);
+ } catch (NoSuchObjectException e) {
+ metastoreClient.createDatabase(
+ new Database(optionalDbName, "optional database", dbOptionalUri,
Collections.emptyMap()));
+ }
+ // Create table in optional database
+ HiveTable optionalTestTable = createTestHiveTable_Avro(optionalDbName,
serdeProps);
+
metastoreClient.createTable(HiveMetaStoreUtils.getTable(optionalTestTable));
+
state =
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN,
"/data/(\\w+)/.*/([\\w\\d_-]+)/hourly.*");
Assert.assertNotNull(metastoreClient.getTable(dbName,
DatasetHiveSchemaContainsNonOptionalUnionTest.testTable));
@@ -99,13 +116,35 @@ public class DatasetHiveSchemaContainsNonOptionalUnionTest
extends HiveMetastore
Assert.assertTrue(predicate.test(dataset));
}
- private HiveTable createTestHiveTable_Avro(State props) {
+ @Test
+ public void testContainsNonOptionalUnionWithOptionalDbName() throws
Exception {
+ state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.OPTIONAL_DB_NAME,
"test_hourly");
+ DatasetHiveSchemaContainsNonOptionalUnion predicate = new
DatasetHiveSchemaContainsNonOptionalUnion(state.getProperties());
+ Dataset dataset = new SimpleDatasetForTesting(datasetUrn);
+ try {
+ Assert.assertTrue(predicate.test(dataset));
+ } finally {
+ // Clean up the property
+
state.removeProp(DatasetHiveSchemaContainsNonOptionalUnion.OPTIONAL_DB_NAME);
+ }
+ }
+
+
+ @Test
+ public void testContainsNonOptionalUnionWithoutOptionalDbName() throws
Exception {
+ // Ensure OPTIONAL_DB_NAME is not set
+
state.removeProp(DatasetHiveSchemaContainsNonOptionalUnion.OPTIONAL_DB_NAME);
+ DatasetHiveSchemaContainsNonOptionalUnion predicate = new
DatasetHiveSchemaContainsNonOptionalUnion(state.getProperties());
+ Dataset dataset = new SimpleDatasetForTesting(datasetUrn);
+ Assert.assertTrue(predicate.test(dataset));
+ }
+
+ private HiveTable createTestHiveTable_Avro(String dbName, State props) {
HiveTable.Builder builder = new HiveTable.Builder();
HiveTable hiveTable =
builder.withDbName(dbName).withTableName(testTable).withProps(props).build();
hiveTable.setInputFormat(AvroContainerInputFormat.class.getName());
hiveTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
hiveTable.setSerDeType(AvroSerDe.class.getName());
-
// Serialize then deserialize as a way to quickly setup table object
Table table = HiveMetaStoreUtils.getTable(hiveTable);
return HiveMetaStoreUtils.getHiveTable(table);