This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new b2f9cab  [GOBBLIN-1248][GOBBLIN-1223] Fix discrepancy between table 
schema and file schema
b2f9cab is described below

commit b2f9cab5c7a192ac35b5b2764bf1449750bc6bf6
Author: Zihan Li <[email protected]>
AuthorDate: Fri Aug 28 11:01:59 2020 -0700

    [GOBBLIN-1248][GOBBLIN-1223] Fix discrepancy between table schema and file 
schema
    
    Closes #3091 from ZihanLi58/GOBBLIN-1248
---
 .../hive/metastore/HiveMetaStoreBasedRegister.java |  71 ++++++++---
 .../metastore/HiveMetaStoreBasedRegisterTest.java  | 133 +++++++++++++++++++++
 2 files changed, 186 insertions(+), 18 deletions(-)

diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 02d38dc..31189ef 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.hive.metastore;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -29,12 +30,12 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.avro.Schema;
 import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
 import org.apache.gobblin.hive.AutoCloseableHiveLock;
-import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry;
-import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryFactory;
-import org.apache.gobblin.kafka.schemareg.SchemaRegistryException;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.util.AvroUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -147,18 +148,19 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
   //for a partition is immutable
   private final boolean skipDiffComputation;
 
-  private Optional<KafkaSchemaRegistry> schemaRegistry = Optional.absent();
+  @VisibleForTesting
+  protected Optional<KafkaSchemaRegistry> schemaRegistry = Optional.absent();
   private String topicName = "";
   public HiveMetaStoreBasedRegister(State state, Optional<String> 
metastoreURI) throws IOException {
     super(state);
     this.locks = new HiveLock(state.getProperties());
 
-    this.optimizedChecks = 
state.getPropAsBoolean(this.OPTIMIZED_CHECK_ENABLED, true);
-    this.skipDiffComputation = 
state.getPropAsBoolean(this.SKIP_PARTITION_DIFF_COMPUTATION, false);
-    this.shouldUpdateLatestSchema = 
state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false);
-    this.registerPartitionWithPullMode = 
state.getPropAsBoolean(this.REGISTER_PARTITION_WITH_PULL_MODE, false);
-    if(state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false)) {
-      this.schemaRegistry = 
Optional.of(KafkaSchemaRegistryFactory.getSchemaRegistry(state.getProperties()));
+    this.optimizedChecks = state.getPropAsBoolean(OPTIMIZED_CHECK_ENABLED, 
true);
+    this.skipDiffComputation = 
state.getPropAsBoolean(SKIP_PARTITION_DIFF_COMPUTATION, false);
+    this.shouldUpdateLatestSchema = 
state.getPropAsBoolean(FETCH_LATEST_SCHEMA, false);
+    this.registerPartitionWithPullMode = 
state.getPropAsBoolean(REGISTER_PARTITION_WITH_PULL_MODE, false);
+    if(this.shouldUpdateLatestSchema) {
+      this.schemaRegistry = 
Optional.of(KafkaSchemaRegistry.get(state.getProperties()));
       topicName = state.getProp(KafkaSource.TOPIC_NAME);
     }
 
@@ -192,16 +194,49 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
       throw new IOException(e);
     }
   }
-  //TODO: We need to find a better to get the latest schema
-  private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+  /**
+   * This method is used to update the table schema to the latest schema
+   * It will fetch creation time of the latest schema from schema registry and 
compare that
+   * with the creation time of writer's schema. If they are the same, then we 
will update the
+   * table schema to the writer's schema, else we will keep the table schema 
the same as schema of
+   * existing table.
+   * Note: If there is no schema specified in the table spec, we will directly 
update the schema to
+   * the existing table schema
+   * Note: We cannot treat the creation time as version number of schema, 
since schema registry allows
+   * "out of order registration" of schemas, this means chronological latest 
is NOT what the registry considers latest.
+   * @param spec
+   * @param table
+   * @param existingTable
+   * @throws IOException
+   */
+  @VisibleForTesting
+  protected void updateSchema(HiveSpec spec, Table table, HiveTable 
existingTable) throws IOException{
 
     if (this.schemaRegistry.isPresent()) {
       try (Timer.Context context = 
this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
-        String latestSchema = 
this.schemaRegistry.get().getLatestSchema(topicName).toString();
-        
spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
 latestSchema);
-        
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
-      } catch (SchemaRegistryException | IOException e) {
-        log.error(String.format("Error when fetch latest schema for topic %s", 
topicName), e);
+        Schema existingTableSchema = new 
Schema.Parser().parse(existingTable.getSerDeProps().getProp(
+            AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+        String existingSchemaCreationTime = 
AvroUtils.getSchemaCreationTime(existingTableSchema);
+        // If no schema set for the table spec, we fall back to existing schema
+        Schema writerSchema = new Schema.Parser().parse((
+            
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
 existingTableSchema.toString())));
+        String writerSchemaCreationTime = 
AvroUtils.getSchemaCreationTime(writerSchema);
+        if(existingSchemaCreationTime != null && 
!existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
+          // If creation time of writer schema does not equal to the existing 
schema, we compare with schema fetched from
+          // schema registry to determine whether to update the schema
+          Schema latestSchema = (Schema) 
this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+          String latestSchemaCreationTime = 
AvroUtils.getSchemaCreationTime(latestSchema);
+          if (latestSchemaCreationTime != null && 
latestSchemaCreationTime.equals(existingSchemaCreationTime)) {
+            // If latest schema creation time equals to existing schema 
creation time, we keep the schema as existing table schema
+            spec.getTable()
+                .getSerDeProps()
+                
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), 
existingTableSchema);
+            
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+          }
+        }
+      } catch ( IOException e) {
+        log.error(String.format("Error when updating latest schema for topic 
%s", topicName));
         throw new IOException(e);
       }
     }
@@ -236,7 +271,7 @@ public class HiveMetaStoreBasedRegister extends 
HiveRegister {
           existingTable = 
HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
         }
         if(shouldUpdateLatestSchema) {
-          updateSchema(spec, table);
+          updateSchema(spec, table, existingTable);
         }
         if (needToUpdateTable(existingTable, 
HiveMetaStoreUtils.getHiveTable(table))) {
           try (Timer.Context context = 
this.metricContext.timer(ALTER_TABLE).time()) {
diff --git 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegisterTest.java
 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegisterTest.java
new file mode 100644
index 0000000..71647c7
--- /dev/null
+++ 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegisterTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.hive.metastore;
+
+import com.google.common.base.Optional;
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.spec.SimpleHiveSpec;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class HiveMetaStoreBasedRegisterTest {
+  @Test
+  public void testUpdateSchemaMethod() throws IOException {
+
+    final String databaseName = "testdb";
+    final String tableName = "testtable";
+
+    State state = new State();
+    state.setProp(HiveMetaStoreBasedRegister.FETCH_LATEST_SCHEMA, true);
+    state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS, 
MockSchemaRegistry.class.getName());
+    HiveMetaStoreBasedRegister register = new 
HiveMetaStoreBasedRegister(state, Optional.absent());
+    Schema writerSchema = new Schema.Parser().parse("{\"type\": \"record\", 
\"name\": \"TestEvent\","
+        + " \"namespace\": \"test.namespace\", \"fields\": 
[{\"name\":\"testName\"," + " \"type\": \"int\"}]}");
+    AvroUtils.setSchemaCreationTime(writerSchema, "111");
+
+    //Build hiveTable
+    HiveTable.Builder builder = new HiveTable.Builder();
+    builder.withDbName(databaseName).withTableName(tableName);
+
+    State serdeProps = new State();
+    serdeProps.setProp("avro.schema.literal", writerSchema.toString());
+    builder.withSerdeProps(serdeProps);
+
+    HiveTable hiveTable = builder.build();
+    HiveTable existingTable = builder.build();
+
+    hiveTable.setInputFormat(AvroContainerInputFormat.class.getName());
+    hiveTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
+    hiveTable.setSerDeType(AvroSerDe.class.getName());
+
+    existingTable.setInputFormat(AvroContainerInputFormat.class.getName());
+    existingTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
+    existingTable.setSerDeType(AvroSerDe.class.getName());
+
+    SimpleHiveSpec.Builder specBuilder = new SimpleHiveSpec.Builder(new 
Path("pathString"))
+        .withPartition(Optional.absent())
+        .withTable(hiveTable);
+    Table table = HiveMetaStoreUtils.getTable(hiveTable);
+    SimpleHiveSpec simpleHiveSpec = specBuilder.build();
+
+    //Test new schema equals existing schema, we don't change anything
+    register.updateSchema(simpleHiveSpec, table, existingTable);
+    Assert.assertEquals(table.getSd().getSerdeInfo().getParameters()
+        .get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()), 
writerSchema.toString());
+
+    //Test new schema does not equal to existing schema, and latest schema 
does not equals to existing schema
+    //We set schema to writer schema
+    register.schemaRegistry.get().register(writerSchema, "writerSchema");
+    Schema existingSchema = new Schema.Parser().parse("{\"type\": \"record\", 
\"name\": \"TestEvent_1\","
+        + " \"namespace\": \"test.namespace\", \"fields\": 
[{\"name\":\"testName_1\"," + " \"type\": \"double\"}]}");
+    AvroUtils.setSchemaCreationTime(existingSchema, "110");
+    existingTable.getSerDeProps()
+        
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(), 
existingSchema.toString());
+    register.updateSchema(simpleHiveSpec, table, existingTable);
+    Assert.assertEquals(table.getSd().getSerdeInfo().getParameters()
+        .get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()), 
writerSchema.toString());
+
+    //Test new schema does not equal to existing schema, latest schema equals 
to existing schema,
+    //in this case, table schema should be existingSchema
+    register.schemaRegistry.get().register(existingSchema, "existingSchema");
+    register.updateSchema(simpleHiveSpec, table, existingTable);
+    Assert.assertEquals(table.getSd().getSerdeInfo().getParameters()
+        .get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()), 
existingSchema.toString());
+
+  }
+
+  public static class MockSchemaRegistry extends KafkaSchemaRegistry<String, 
Schema> {
+    static Schema latestSchema = Schema.create(Schema.Type.STRING);
+
+    public MockSchemaRegistry(Properties props) {
+      super(props);
+    }
+
+    @Override
+    protected Schema fetchSchemaByKey(String key) throws 
SchemaRegistryException {
+      return null;
+    }
+
+    @Override
+    public Schema getLatestSchemaByTopic(String topic) throws 
SchemaRegistryException {
+      return latestSchema;
+    }
+
+    @Override
+    public String register(Schema schema) throws SchemaRegistryException {
+      return null;
+    }
+
+    @Override
+    public String register(Schema schema, String name) throws 
SchemaRegistryException {
+      this.latestSchema = schema;
+      return schema.toString();
+    }
+  }
+}

Reply via email to