This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c4a466b41 [GOBBLIN-1853] Reduce # of Hive calls during schema related 
updates (#3716)
c4a466b41 is described below

commit c4a466b414c43a59ddead75de25ae87e6732b716
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Jul 18 15:08:07 2023 -0700

    [GOBBLIN-1853] Reduce # of Hive calls during schema related updates (#3716)
---
 .../gobblin/hive/writer/HiveMetadataWriter.java    | 63 +++++++++++++---------
 .../iceberg/writer/IcebergMetadataWriter.java      | 15 ++++--
 .../iceberg/writer/HiveMetadataWriterTest.java     |  6 +++
 3 files changed, 55 insertions(+), 29 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 02c0c5895..138ac3d94 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -17,16 +17,6 @@
 
 package org.apache.gobblin.hive.writer;
 
-import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.ListenableFuture;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
@@ -38,20 +28,37 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
-import javax.annotation.Nullable;
 
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import javax.annotation.Nullable;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.specific.SpecificData;
+
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
 import org.apache.gobblin.hive.HiveRegister;
 import org.apache.gobblin.hive.HiveRegistrationUnit;
 import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
 import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
 import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
 import org.apache.gobblin.hive.spec.HiveSpec;
@@ -68,10 +75,6 @@ import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.ClustersNames;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-
 
 /**
  * This writer is used to register the hiveSpec into hive metaStore
@@ -313,9 +316,15 @@ public class HiveMetadataWriter implements MetadataWriter {
       return false;
     }
 
-    HiveTable existingTable = hiveRegister.getTable(dbName, tableName).get();
-    latestSchemaMap.put(tableKey,
-        
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+    HiveTable table = hiveRegister.getTable(dbName, tableName).get();
+    String latestSchema = 
table.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+    if (latestSchema == null) {
+      throw new IllegalStateException(String.format("The %s in the table %s.%s 
is null. This implies the DB is "
+              + "misconfigured and was not correctly created through Gobblin, 
since all Gobblin managed tables should "
+              + "have %s", HiveAvroSerDeManager.SCHEMA_LITERAL, dbName, 
tableName, HiveAvroSerDeManager.SCHEMA_LITERAL));
+    }
+
+    latestSchemaMap.put(tableKey, latestSchema);
     return true;
   }
 
@@ -445,10 +454,14 @@ public class HiveMetadataWriter implements MetadataWriter 
{
       return;
     }
     //Force to set the schema even there is no schema literal defined in the 
spec
-    if (latestSchemaMap.containsKey(tableKey)) {
-      spec.getTable().getSerDeProps()
-          
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), 
latestSchemaMap.get(tableKey));
-      HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+    String latestSchema = latestSchemaMap.get(tableKey);
+    if (latestSchema != null) {
+      String tableSchema = 
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+      if (tableSchema == null || !tableSchema.equals(latestSchema)) {
+        spec.getTable().getSerDeProps()
+            
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), 
latestSchemaMap.get(tableKey));
+        HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+      }
     }
   }
 
diff --git 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 57a28f778..b6c9e15dc 100644
--- 
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++ 
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -48,10 +48,7 @@ import java.util.stream.Stream;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.specific.SpecificData;
-
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.gobblin.hive.writer.MetadataWriterKeys;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -112,6 +109,7 @@ import org.apache.gobblin.hive.HivePartition;
 import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
 import org.apache.gobblin.hive.spec.HiveSpec;
 import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.hive.writer.MetadataWriterKeys;
 import org.apache.gobblin.iceberg.Utils.IcebergUtils;
 import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
 import org.apache.gobblin.metadata.OperationType;
@@ -122,6 +120,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.stream.RecordEnvelope;
 import org.apache.gobblin.time.TimeIterator;
 import org.apache.gobblin.util.AvroUtils;
@@ -129,6 +128,7 @@ import org.apache.gobblin.util.ClustersNames;
 import org.apache.gobblin.util.HadoopUtils;
 import org.apache.gobblin.util.ParallelRunner;
 import org.apache.gobblin.util.WriterUtils;
+
 import static 
org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
 
 /**
@@ -493,13 +493,20 @@ public class IcebergMetadataWriter implements 
MetadataWriter {
    * @return table with updated schema and partition spec
    */
   private Table addPartitionToIcebergTable(Table table, String fieldName, 
String type) {
+    boolean isTableUpdated = false;
     if(!table.schema().columns().stream().anyMatch(x -> 
x.name().equalsIgnoreCase(fieldName))) {
       table.updateSchema().addColumn(fieldName, 
Types.fromPrimitiveString(type)).commit();
+      isTableUpdated = true;
     }
     if(!table.spec().fields().stream().anyMatch(x -> 
x.name().equalsIgnoreCase(fieldName))) {
       table.updateSpec().addField(fieldName).commit();
+      isTableUpdated = true;
+    }
+
+    if (isTableUpdated) {
+      table.refresh();
     }
-    table.refresh();
+
     return table;
   }
 
diff --git 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index f50bc81aa..aa6c73d52 100644
--- 
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++ 
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -388,6 +388,12 @@ public class HiveMetadataWriterTest extends 
HiveMetastoreTest {
     ));
     Assert.assertTrue(updateLatestSchema.apply(tableNameAllowed));
     Mockito.verify(hiveRegister, Mockito.times(2)).getTable(eq(dbName), 
eq(tableNameAllowed));
+
+    HiveTable tableThatHasNoSchemaLiteral = Mockito.mock(HiveTable.class);
+    String nameOfTableThatHasNoSchemaLiteral = "improperlyConfiguredTable";
+    Mockito.when(hiveRegister.getTable(eq(dbName), 
eq(nameOfTableThatHasNoSchemaLiteral))).thenReturn(Optional.of(tableThatHasNoSchemaLiteral));
+    Mockito.when(tableThatHasNoSchemaLiteral.getSerDeProps()).thenReturn(new 
State());
+    Assert.assertThrows(IllegalStateException.class, () -> 
updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral));
   }
 
   private String writeRecord(File file) throws IOException {

Reply via email to