This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c8891402 [GOBBLIN-2087] Enhance 
DatasetHiveSchemaContainsNonOptionalUnion to suport Optional database name 
(#3972)
78c8891402 is described below

commit 78c889140266d27bf5826e5c5b78b045b0bcb1f1
Author: pawanbtej <[email protected]>
AuthorDate: Tue Jul 30 10:16:47 2024 -0700

    [GOBBLIN-2087] Enhance DatasetHiveSchemaContainsNonOptionalUnion to suport 
Optional database name (#3972)
    
    * GOBBLIN-2087 Enhance DatasetHiveSchemaContainsNonOptionalUnion to support 
Optional database name
    
    ---------
    
    Co-authored-by: preddy_LinkedIn <[email protected]>
---
 .../DatasetHiveSchemaContainsNonOptionalUnion.java | 22 ++++++++++-
 ...asetHiveSchemaContainsNonOptionalUnionTest.java | 45 ++++++++++++++++++++--
 2 files changed, 62 insertions(+), 5 deletions(-)

diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
index c74afaeec2..391ca553d2 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnion.java
@@ -42,16 +42,20 @@ import 
org.apache.gobblin.util.function.CheckedExceptionPredicate;
 public class DatasetHiveSchemaContainsNonOptionalUnion<T extends Dataset> 
implements CheckedExceptionPredicate<T, IOException> {
   private final HiveRegister hiveRegister;
   private final Pattern pattern;
+  private final Optional<String> optionalDbName;
+
 
   public static final String PREFIX = 
DatasetHiveSchemaContainsNonOptionalUnion.class.getName();
   /**
    * 1st match group is assumed to be the DB and the 2nd match group the Table 
for the pattern
    */
   public static final String PATTERN = PREFIX + ".db.table.pattern";
+  public static final String OPTIONAL_DB_NAME = PREFIX + ".db.optionalDbName";
 
   public DatasetHiveSchemaContainsNonOptionalUnion(Properties properties) {
     this.hiveRegister = getHiveRegister(new State(properties));
     this.pattern = Pattern.compile(properties.getProperty(PATTERN));
+    this.optionalDbName = 
Optional.fromNullable(properties.getProperty(OPTIONAL_DB_NAME));
   }
 
   @Override
@@ -67,7 +71,15 @@ public class DatasetHiveSchemaContainsNonOptionalUnion<T 
extends Dataset> implem
 
   private Optional<HiveTable> getTable(T dataset) throws IOException {
     DbAndTable dbAndTable = getDbAndTable(dataset);
-    return this.hiveRegister.getTable(dbAndTable.getDb(), 
dbAndTable.getTable());
+    log.info("Checking for table in DB: {} and Table: {}", dbAndTable.getDb(), 
dbAndTable.getTable());
+    Optional<HiveTable> hiveTable = 
this.hiveRegister.getTable(dbAndTable.getDb(), dbAndTable.getTable());
+
+    if (hiveTable.isPresent()) {
+      log.info("Table found in DB: {} and Table: {}. Exiting execution.", 
dbAndTable.getDb(), dbAndTable.getTable());
+    } else {
+      log.info("No table found in DB: {} and Table: {}.", dbAndTable.getDb(), 
dbAndTable.getTable());
+    }
+    return hiveTable;
   }
 
   private DbAndTable getDbAndTable(T dataset) {
@@ -77,7 +89,13 @@ public class DatasetHiveSchemaContainsNonOptionalUnion<T 
extends Dataset> implem
       "Expected pattern = %s", dataset.getUrn(), pattern.pattern()));
     }
 
-    return new DbAndTable(m.group(1), 
HiveMetaStoreUtils.getHiveTableName(m.group(2)));
+    String db = optionalDbName.or(m.group(1));
+    if (optionalDbName.isPresent()) {
+      log.info("DB name from pattern: {}. Replacing with provided DB name: 
{}", m.group(1), optionalDbName.get());
+    }
+
+    String table = HiveMetaStoreUtils.getHiveTableName(m.group(2));
+    return new DbAndTable(db, table);
   }
 
   boolean containsNonOptionalUnion(HiveTable table) {
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
index 5436b5e6d8..a0608e64ed 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/predicates/DatasetHiveSchemaContainsNonOptionalUnionTest.java
@@ -56,8 +56,12 @@ public class DatasetHiveSchemaContainsNonOptionalUnionTest 
extends HiveMetastore
   private static File tmpDir;
   private static State state;
   private static String dbUri;
+
+  private static String dbOptionalUri;
+
   private static String testTable = "test_table01";
   private static String datasetUrn = 
String.format("/data/%s/streaming/test-Table01/hourly/2023/01/01", dbName);
+  private static String optionalDbName = "test_hourly";
 
   @AfterSuite
   public void clean() throws Exception {
@@ -84,9 +88,22 @@ public class DatasetHiveSchemaContainsNonOptionalUnionTest 
extends HiveMetastore
     final String avroSchema = "{\"type\": \"record\", \"name\": 
\"TestEvent\",\"namespace\": \"test.namespace\", \"fields\": "
         + "[{\"name\":\"fieldName\", \"type\": %s}]}";
     serdeProps.setProp("avro.schema.literal", String.format(avroSchema, 
"[\"string\", \"int\"]"));
-    HiveTable testTable = createTestHiveTable_Avro(serdeProps);
+    // Create table in main database
+    HiveTable testTable = createTestHiveTable_Avro(dbName, serdeProps);
     metastoreClient.createTable(HiveMetaStoreUtils.getTable(testTable));
 
+    // Create optional database
+    dbOptionalUri = String.format("%s/%s/%s", 
tmpDir.getAbsolutePath(),"metastore", optionalDbName);
+    try {
+      metastoreClient.getDatabase(optionalDbName);
+    } catch (NoSuchObjectException e) {
+      metastoreClient.createDatabase(
+          new Database(optionalDbName, "optional database", dbOptionalUri, 
Collections.emptyMap()));
+    }
+    // Create table in optional database
+    HiveTable optionalTestTable = createTestHiveTable_Avro(optionalDbName, 
serdeProps);
+    
metastoreClient.createTable(HiveMetaStoreUtils.getTable(optionalTestTable));
+
     state = 
ConfigUtils.configToState(ConfigUtils.propertiesToConfig(hiveConf.getAllProperties()));
     state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.PATTERN, 
"/data/(\\w+)/.*/([\\w\\d_-]+)/hourly.*");
     Assert.assertNotNull(metastoreClient.getTable(dbName, 
DatasetHiveSchemaContainsNonOptionalUnionTest.testTable));
@@ -99,13 +116,35 @@ public class DatasetHiveSchemaContainsNonOptionalUnionTest 
extends HiveMetastore
     Assert.assertTrue(predicate.test(dataset));
   }
 
-  private HiveTable createTestHiveTable_Avro(State props) {
+  @Test
+  public void testContainsNonOptionalUnionWithOptionalDbName() throws 
Exception {
+    state.setProp(DatasetHiveSchemaContainsNonOptionalUnion.OPTIONAL_DB_NAME, 
"test_hourly");
+    DatasetHiveSchemaContainsNonOptionalUnion predicate = new 
DatasetHiveSchemaContainsNonOptionalUnion(state.getProperties());
+    Dataset dataset = new SimpleDatasetForTesting(datasetUrn);
+    try {
+    Assert.assertTrue(predicate.test(dataset));
+    } finally {
+      // Clean up the property
+      
state.removeProp(DatasetHiveSchemaContainsNonOptionalUnion.OPTIONAL_DB_NAME);
+    }
+  }
+
+
+  @Test
+  public void testContainsNonOptionalUnionWithoutOptionalDbName() throws 
Exception {
+    // Ensure OPTIONAL_DB_NAME is not set
+    
state.removeProp(DatasetHiveSchemaContainsNonOptionalUnion.OPTIONAL_DB_NAME);
+    DatasetHiveSchemaContainsNonOptionalUnion predicate = new 
DatasetHiveSchemaContainsNonOptionalUnion(state.getProperties());
+    Dataset dataset = new SimpleDatasetForTesting(datasetUrn);
+    Assert.assertTrue(predicate.test(dataset));
+  }
+
+  private HiveTable createTestHiveTable_Avro(String dbName, State props) {
     HiveTable.Builder builder = new HiveTable.Builder();
     HiveTable hiveTable = 
builder.withDbName(dbName).withTableName(testTable).withProps(props).build();
     hiveTable.setInputFormat(AvroContainerInputFormat.class.getName());
     hiveTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
     hiveTable.setSerDeType(AvroSerDe.class.getName());
-
     // Serialize then deserialize as a way to quickly setup table object
     Table table = HiveMetaStoreUtils.getTable(hiveTable);
     return HiveMetaStoreUtils.getHiveTable(table);

Reply via email to