This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new b2f9cab [GOBBLIN-1248][GOBBLIN-1223] Fix discrepancy between table
schema and file schema
b2f9cab is described below
commit b2f9cab5c7a192ac35b5b2764bf1449750bc6bf6
Author: Zihan Li <[email protected]>
AuthorDate: Fri Aug 28 11:01:59 2020 -0700
[GOBBLIN-1248][GOBBLIN-1223] Fix discrepancy between table schema and file
schema
Closes #3091 from ZihanLi58/GOBBLIN-1248
---
.../hive/metastore/HiveMetaStoreBasedRegister.java | 71 ++++++++---
.../metastore/HiveMetaStoreBasedRegisterTest.java | 133 +++++++++++++++++++++
2 files changed, 186 insertions(+), 18 deletions(-)
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
index 02d38dc..31189ef 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegister.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.hive.metastore;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -29,12 +30,12 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
+import org.apache.avro.Schema;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.gobblin.hive.AutoCloseableHiveLock;
-import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistry;
-import org.apache.gobblin.kafka.schemareg.KafkaSchemaRegistryFactory;
-import org.apache.gobblin.kafka.schemareg.SchemaRegistryException;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
+import org.apache.gobblin.util.AvroUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
@@ -147,18 +148,19 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
//for a partition is immutable
private final boolean skipDiffComputation;
- private Optional<KafkaSchemaRegistry> schemaRegistry = Optional.absent();
+ @VisibleForTesting
+ protected Optional<KafkaSchemaRegistry> schemaRegistry = Optional.absent();
private String topicName = "";
public HiveMetaStoreBasedRegister(State state, Optional<String>
metastoreURI) throws IOException {
super(state);
this.locks = new HiveLock(state.getProperties());
- this.optimizedChecks =
state.getPropAsBoolean(this.OPTIMIZED_CHECK_ENABLED, true);
- this.skipDiffComputation =
state.getPropAsBoolean(this.SKIP_PARTITION_DIFF_COMPUTATION, false);
- this.shouldUpdateLatestSchema =
state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false);
- this.registerPartitionWithPullMode =
state.getPropAsBoolean(this.REGISTER_PARTITION_WITH_PULL_MODE, false);
- if(state.getPropAsBoolean(this.FETCH_LATEST_SCHEMA, false)) {
- this.schemaRegistry =
Optional.of(KafkaSchemaRegistryFactory.getSchemaRegistry(state.getProperties()));
+ this.optimizedChecks = state.getPropAsBoolean(OPTIMIZED_CHECK_ENABLED,
true);
+ this.skipDiffComputation =
state.getPropAsBoolean(SKIP_PARTITION_DIFF_COMPUTATION, false);
+ this.shouldUpdateLatestSchema =
state.getPropAsBoolean(FETCH_LATEST_SCHEMA, false);
+ this.registerPartitionWithPullMode =
state.getPropAsBoolean(REGISTER_PARTITION_WITH_PULL_MODE, false);
+ if(this.shouldUpdateLatestSchema) {
+ this.schemaRegistry =
Optional.of(KafkaSchemaRegistry.get(state.getProperties()));
topicName = state.getProp(KafkaSource.TOPIC_NAME);
}
@@ -192,16 +194,49 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
throw new IOException(e);
}
}
- //TODO: We need to find a better to get the latest schema
- private void updateSchema(HiveSpec spec, Table table) throws IOException{
+
+ /**
+ * This method is used to update the table schema to the latest schema
+ * It will fetch creation time of the latest schema from schema registry and
compare that
+ * with the creation time of writer's schema. If they are the same, then we
will update the
+ * table schema to the writer's schema, else we will keep the table schema
the same as schema of
+ * existing table.
+ * Note: If there is no schema specified in the table spec, we will directly
update the schema to
+ * the existing table schema
+ * Note: We cannot treat the creation time as version number of schema,
since schema registry allows
+ * "out of order registration" of schemas, this means chronological latest
is NOT what the registry considers latest.
+ * @param spec
+ * @param table
+ * @param existingTable
+ * @throws IOException
+ */
+ @VisibleForTesting
+ protected void updateSchema(HiveSpec spec, Table table, HiveTable
existingTable) throws IOException{
if (this.schemaRegistry.isPresent()) {
try (Timer.Context context =
this.metricContext.timer(GET_AND_SET_LATEST_SCHEMA).time()) {
- String latestSchema =
this.schemaRegistry.get().getLatestSchema(topicName).toString();
-
spec.getTable().getSerDeProps().setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
latestSchema);
-
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
- } catch (SchemaRegistryException | IOException e) {
- log.error(String.format("Error when fetch latest schema for topic %s",
topicName), e);
+ Schema existingTableSchema = new
Schema.Parser().parse(existingTable.getSerDeProps().getProp(
+ AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()));
+ String existingSchemaCreationTime =
AvroUtils.getSchemaCreationTime(existingTableSchema);
+ // If no schema set for the table spec, we fall back to existing schema
+ Schema writerSchema = new Schema.Parser().parse((
+
spec.getTable().getSerDeProps().getProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
existingTableSchema.toString())));
+ String writerSchemaCreationTime =
AvroUtils.getSchemaCreationTime(writerSchema);
+ if(existingSchemaCreationTime != null &&
!existingSchemaCreationTime.equals(writerSchemaCreationTime)) {
+ // If creation time of writer schema does not equal to the existing
schema, we compare with schema fetched from
+ // schema registry to determine whether to update the schema
+ Schema latestSchema = (Schema)
this.schemaRegistry.get().getLatestSchemaByTopic(topicName);
+ String latestSchemaCreationTime =
AvroUtils.getSchemaCreationTime(latestSchema);
+ if (latestSchemaCreationTime != null &&
latestSchemaCreationTime.equals(existingSchemaCreationTime)) {
+ // If latest schema creation time equals to existing schema
creation time, we keep the schema as existing table schema
+ spec.getTable()
+ .getSerDeProps()
+
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
existingTableSchema);
+
table.getSd().setSerdeInfo(HiveMetaStoreUtils.getSerDeInfo(spec.getTable()));
+ }
+ }
+ } catch ( IOException e) {
+ log.error(String.format("Error when updating latest schema for topic
%s", topicName));
throw new IOException(e);
}
}
@@ -236,7 +271,7 @@ public class HiveMetaStoreBasedRegister extends
HiveRegister {
existingTable =
HiveMetaStoreUtils.getHiveTable(client.getTable(dbName, tableName));
}
if(shouldUpdateLatestSchema) {
- updateSchema(spec, table);
+ updateSchema(spec, table, existingTable);
}
if (needToUpdateTable(existingTable,
HiveMetaStoreUtils.getHiveTable(table))) {
try (Timer.Context context =
this.metricContext.timer(ALTER_TABLE).time()) {
diff --git
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegisterTest.java
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegisterTest.java
new file mode 100644
index 0000000..71647c7
--- /dev/null
+++
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreBasedRegisterTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.hive.metastore;
+
+import com.google.common.base.Optional;
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.avro.Schema;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.hive.spec.SimpleHiveSpec;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.util.AvroUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
+import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class HiveMetaStoreBasedRegisterTest {
+ @Test
+ public void testUpdateSchemaMethod() throws IOException {
+
+ final String databaseName = "testdb";
+ final String tableName = "testtable";
+
+ State state = new State();
+ state.setProp(HiveMetaStoreBasedRegister.FETCH_LATEST_SCHEMA, true);
+ state.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
MockSchemaRegistry.class.getName());
+ HiveMetaStoreBasedRegister register = new
HiveMetaStoreBasedRegister(state, Optional.absent());
+ Schema writerSchema = new Schema.Parser().parse("{\"type\": \"record\",
\"name\": \"TestEvent\","
+ + " \"namespace\": \"test.namespace\", \"fields\":
[{\"name\":\"testName\"," + " \"type\": \"int\"}]}");
+ AvroUtils.setSchemaCreationTime(writerSchema, "111");
+
+ //Build hiveTable
+ HiveTable.Builder builder = new HiveTable.Builder();
+ builder.withDbName(databaseName).withTableName(tableName);
+
+ State serdeProps = new State();
+ serdeProps.setProp("avro.schema.literal", writerSchema.toString());
+ builder.withSerdeProps(serdeProps);
+
+ HiveTable hiveTable = builder.build();
+ HiveTable existingTable = builder.build();
+
+ hiveTable.setInputFormat(AvroContainerInputFormat.class.getName());
+ hiveTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
+ hiveTable.setSerDeType(AvroSerDe.class.getName());
+
+ existingTable.setInputFormat(AvroContainerInputFormat.class.getName());
+ existingTable.setOutputFormat(AvroContainerOutputFormat.class.getName());
+ existingTable.setSerDeType(AvroSerDe.class.getName());
+
+ SimpleHiveSpec.Builder specBuilder = new SimpleHiveSpec.Builder(new
Path("pathString"))
+ .withPartition(Optional.absent())
+ .withTable(hiveTable);
+ Table table = HiveMetaStoreUtils.getTable(hiveTable);
+ SimpleHiveSpec simpleHiveSpec = specBuilder.build();
+
+ //Test new schema equals existing schema, we don't change anything
+ register.updateSchema(simpleHiveSpec, table, existingTable);
+ Assert.assertEquals(table.getSd().getSerdeInfo().getParameters()
+ .get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()),
writerSchema.toString());
+
+ //Test new schema does not equal to existing schema, and latest schema
does not equals to existing schema
+ //We set schema to writer schema
+ register.schemaRegistry.get().register(writerSchema, "writerSchema");
+ Schema existingSchema = new Schema.Parser().parse("{\"type\": \"record\",
\"name\": \"TestEvent_1\","
+ + " \"namespace\": \"test.namespace\", \"fields\":
[{\"name\":\"testName_1\"," + " \"type\": \"double\"}]}");
+ AvroUtils.setSchemaCreationTime(existingSchema, "110");
+ existingTable.getSerDeProps()
+
.setProp(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName(),
existingSchema.toString());
+ register.updateSchema(simpleHiveSpec, table, existingTable);
+ Assert.assertEquals(table.getSd().getSerdeInfo().getParameters()
+ .get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()),
writerSchema.toString());
+
+ //Test new schema does not equal to existing schema, latest schema equals
to existing schema,
+ //in this case, table schema should be existingSchema
+ register.schemaRegistry.get().register(existingSchema, "existingSchema");
+ register.updateSchema(simpleHiveSpec, table, existingTable);
+ Assert.assertEquals(table.getSd().getSerdeInfo().getParameters()
+ .get(AvroSerdeUtils.AvroTableProperties.SCHEMA_LITERAL.getPropName()),
existingSchema.toString());
+
+ }
+
+ public static class MockSchemaRegistry extends KafkaSchemaRegistry<String,
Schema> {
+ static Schema latestSchema = Schema.create(Schema.Type.STRING);
+
+ public MockSchemaRegistry(Properties props) {
+ super(props);
+ }
+
+ @Override
+ protected Schema fetchSchemaByKey(String key) throws
SchemaRegistryException {
+ return null;
+ }
+
+ @Override
+ public Schema getLatestSchemaByTopic(String topic) throws
SchemaRegistryException {
+ return latestSchema;
+ }
+
+ @Override
+ public String register(Schema schema) throws SchemaRegistryException {
+ return null;
+ }
+
+ @Override
+ public String register(Schema schema, String name) throws
SchemaRegistryException {
+ this.latestSchema = schema;
+ return schema.toString();
+ }
+ }
+}