This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c4a466b41 [GOBBLIN-1853] Reduce # of Hive calls during schema related
updates (#3716)
c4a466b41 is described below
commit c4a466b414c43a59ddead75de25ae87e6732b716
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Jul 18 15:08:07 2023 -0700
[GOBBLIN-1853] Reduce # of Hive calls during schema related updates (#3716)
---
.../gobblin/hive/writer/HiveMetadataWriter.java | 63 +++++++++++++---------
.../iceberg/writer/IcebergMetadataWriter.java | 15 ++++--
.../iceberg/writer/HiveMetadataWriterTest.java | 6 +++
3 files changed, 55 insertions(+), 29 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
index 02c0c5895..138ac3d94 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/writer/HiveMetadataWriter.java
@@ -17,16 +17,6 @@
package org.apache.gobblin.hive.writer;
-import com.codahale.metrics.Timer;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Optional;
-import com.google.common.base.Throwables;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.io.Closer;
-import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -38,20 +28,37 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-import javax.annotation.Nullable;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificData;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+
+import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Throwables;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import javax.annotation.Nullable;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.specific.SpecificData;
+
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
import org.apache.gobblin.hive.HiveRegister;
import org.apache.gobblin.hive.HiveRegistrationUnit;
import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
import org.apache.gobblin.hive.metastore.HiveMetaStoreBasedRegister;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
@@ -68,10 +75,6 @@ import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.util.AvroUtils;
import org.apache.gobblin.util.ClustersNames;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
-import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
-
/**
* This writer is used to register the hiveSpec into hive metaStore
@@ -313,9 +316,15 @@ public class HiveMetadataWriter implements MetadataWriter {
return false;
}
- HiveTable existingTable = hiveRegister.getTable(dbName, tableName).get();
- latestSchemaMap.put(tableKey,
-
existingTable.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+ HiveTable table = hiveRegister.getTable(dbName, tableName).get();
+ String latestSchema =
table.getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+ if (latestSchema == null) {
+ throw new IllegalStateException(String.format("The %s in the table %s.%s
is null. This implies the DB is "
+ + "misconfigured and was not correctly created through Gobblin,
since all Gobblin managed tables should "
+ + "have %s", HiveAvroSerDeManager.SCHEMA_LITERAL, dbName,
tableName, HiveAvroSerDeManager.SCHEMA_LITERAL));
+ }
+
+ latestSchemaMap.put(tableKey, latestSchema);
return true;
}
@@ -445,10 +454,14 @@ public class HiveMetadataWriter implements MetadataWriter
{
return;
}
//Force to set the schema even there is no schema literal defined in the
spec
- if (latestSchemaMap.containsKey(tableKey)) {
- spec.getTable().getSerDeProps()
-
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
latestSchemaMap.get(tableKey));
- HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+ String latestSchema = latestSchemaMap.get(tableKey);
+ if (latestSchema != null) {
+ String tableSchema =
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName());
+ if (tableSchema == null || !tableSchema.equals(latestSchema)) {
+ spec.getTable().getSerDeProps()
+
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
latestSchemaMap.get(tableKey));
+ HiveMetaStoreUtils.updateColumnsInfoIfNeeded(spec);
+ }
}
}
diff --git
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
index 57a28f778..b6c9e15dc 100644
---
a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
+++
b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/writer/IcebergMetadataWriter.java
@@ -48,10 +48,7 @@ import java.util.stream.Stream;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificData;
-
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.gobblin.hive.writer.MetadataWriterKeys;
-import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -112,6 +109,7 @@ import org.apache.gobblin.hive.HivePartition;
import org.apache.gobblin.hive.metastore.HiveMetaStoreUtils;
import org.apache.gobblin.hive.spec.HiveSpec;
import org.apache.gobblin.hive.writer.MetadataWriter;
+import org.apache.gobblin.hive.writer.MetadataWriterKeys;
import org.apache.gobblin.iceberg.Utils.IcebergUtils;
import org.apache.gobblin.metadata.GobblinMetadataChangeEvent;
import org.apache.gobblin.metadata.OperationType;
@@ -122,6 +120,7 @@ import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.LongWatermark;
import org.apache.gobblin.stream.RecordEnvelope;
import org.apache.gobblin.time.TimeIterator;
import org.apache.gobblin.util.AvroUtils;
@@ -129,6 +128,7 @@ import org.apache.gobblin.util.ClustersNames;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.ParallelRunner;
import org.apache.gobblin.util.WriterUtils;
+
import static
org.apache.gobblin.iceberg.writer.IcebergMetadataWriterConfigKeys.*;
/**
@@ -493,13 +493,20 @@ public class IcebergMetadataWriter implements
MetadataWriter {
* @return table with updated schema and partition spec
*/
private Table addPartitionToIcebergTable(Table table, String fieldName,
String type) {
+ boolean isTableUpdated = false;
if(!table.schema().columns().stream().anyMatch(x ->
x.name().equalsIgnoreCase(fieldName))) {
table.updateSchema().addColumn(fieldName,
Types.fromPrimitiveString(type)).commit();
+ isTableUpdated = true;
}
if(!table.spec().fields().stream().anyMatch(x ->
x.name().equalsIgnoreCase(fieldName))) {
table.updateSpec().addField(fieldName).commit();
+ isTableUpdated = true;
+ }
+
+ if (isTableUpdated) {
+ table.refresh();
}
- table.refresh();
+
return table;
}
diff --git
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
index f50bc81aa..aa6c73d52 100644
---
a/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
+++
b/gobblin-iceberg/src/test/java/org/apache/gobblin/iceberg/writer/HiveMetadataWriterTest.java
@@ -388,6 +388,12 @@ public class HiveMetadataWriterTest extends
HiveMetastoreTest {
));
Assert.assertTrue(updateLatestSchema.apply(tableNameAllowed));
Mockito.verify(hiveRegister, Mockito.times(2)).getTable(eq(dbName),
eq(tableNameAllowed));
+
+ HiveTable tableThatHasNoSchemaLiteral = Mockito.mock(HiveTable.class);
+ String nameOfTableThatHasNoSchemaLiteral = "improperlyConfiguredTable";
+ Mockito.when(hiveRegister.getTable(eq(dbName),
eq(nameOfTableThatHasNoSchemaLiteral))).thenReturn(Optional.of(tableThatHasNoSchemaLiteral));
+ Mockito.when(tableThatHasNoSchemaLiteral.getSerDeProps()).thenReturn(new
State());
+ Assert.assertThrows(IllegalStateException.class, () ->
updateLatestSchema.apply(nameOfTableThatHasNoSchemaLiteral));
}
private String writeRecord(File file) throws IOException {