This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 78da5dd [GOBBLIN-1345] Implement a Kafka schema change detecting
converter
78da5dd is described below
commit 78da5dd29b4e5e400c21dd78b6044841372771e5
Author: suvasude <[email protected]>
AuthorDate: Wed Jan 6 09:16:05 2021 -0800
[GOBBLIN-1345] Implement a Kafka schema change detecting converter
Closes #3182 from sv2000/kafkaSchemaChangeInjector
---
...enericRecordBasedKafkaSchemaChangeInjector.java | 36 ++++++
.../converter/KafkaSchemaChangeInjector.java | 125 +++++++++++++++++++++
.../converter/KafkaSchemaChangeInjectorTest.java | 125 +++++++++++++++++++++
3 files changed, 286 insertions(+)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/GenericRecordBasedKafkaSchemaChangeInjector.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/GenericRecordBasedKafkaSchemaChangeInjector.java
new file mode 100644
index 0000000..280b771
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/GenericRecordBasedKafkaSchemaChangeInjector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+
+
+/**
+ * A {@link org.apache.gobblin.stream.ControlMessageInjector} that detects
changes in the latest schema and notifies downstream constructs by
+ * injecting a {@link org.apache.gobblin.stream.MetadataUpdateControlMessage}.
+ * Also supports multi-dataset schema changes.
+ */
+public class GenericRecordBasedKafkaSchemaChangeInjector extends
KafkaSchemaChangeInjector<Schema> {
+ @Override
+ protected Schema getSchemaIdentifier(DecodeableKafkaRecord consumerRecord) {
+ GenericRecord genericRecord = (GenericRecord) consumerRecord.getValue();
+ return genericRecord.getSchema();
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/KafkaSchemaChangeInjector.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/KafkaSchemaChangeInjector.java
new file mode 100644
index 0000000..42bf428
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/KafkaSchemaChangeInjector.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.avro.Schema;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metadata.GlobalMetadata;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.ControlMessageInjector;
+import org.apache.gobblin.stream.MetadataUpdateControlMessage;
+import org.apache.gobblin.stream.RecordEnvelope;
+
+
+/**
+ * A {@link org.apache.gobblin.stream.MetadataUpdateControlMessage} that
detects changes in the latest schema and notifies downstream constructs by
+ * injecting a {@link org.apache.gobblin.stream.MetadataUpdateControlMessage}.
+ */
+public abstract class KafkaSchemaChangeInjector<S>
+ extends ControlMessageInjector<Schema, DecodeableKafkaRecord> {
+ @VisibleForTesting
+ @Getter(AccessLevel.PACKAGE)
+ private KafkaSchemaRegistry<String, Schema> schemaRegistry;
+
+ @VisibleForTesting
+ @Getter(AccessLevel.PACKAGE)
+ private Cache<S, String> schemaCache;
+ private Schema latestSchema;
+ private GlobalMetadata<Schema> globalMetadata;
+
+ // classes that extend this need to implement getSchemaIdentifier
+ protected abstract S getSchemaIdentifier(DecodeableKafkaRecord
consumerRecord);
+
+ @Override
+ public ControlMessageInjector<Schema, DecodeableKafkaRecord>
init(WorkUnitState workUnitState) {
+ this.schemaRegistry =
KafkaSchemaRegistry.get(workUnitState.getProperties());
+ this.schemaCache = CacheBuilder.newBuilder().expireAfterAccess(1,
TimeUnit.HOURS).build();
+ return this;
+ }
+
+ @Override
+ public void setInputGlobalMetadata(GlobalMetadata<Schema>
inputGlobalMetadata, WorkUnitState workUnitState) {
+ this.globalMetadata = inputGlobalMetadata;
+ }
+
+ /**
+ * Inject a {@link org.apache.gobblin.stream.MetadataUpdateControlMessage}
if the latest schema has changed. Check whether there is a new latest
+ * schema if the input record's schema is not present in the schema cache.
+ *
+ * @param inputRecordEnvelope input record envelope
+ * @param workUnitState work unit state
+ * @return the injected messages
+ */
+ @Override
+ public Iterable<ControlMessage<DecodeableKafkaRecord>>
injectControlMessagesBefore(
+ RecordEnvelope<DecodeableKafkaRecord> inputRecordEnvelope, WorkUnitState
workUnitState) {
+ DecodeableKafkaRecord consumerRecord = inputRecordEnvelope.getRecord();
+ S schemaIdentifier = getSchemaIdentifier(consumerRecord);
+ String topicName = consumerRecord.getTopic();
+
+ // If a new schema is seen then check the latest schema in the registry
has changed.
+ // Only check for the latest schema when a new schema is seen since the
call to get the latest schema is not
+ // cacheable and is expensive.
+ if (this.schemaCache.getIfPresent(schemaIdentifier) == null) {
+ try {
+ Schema latestSchema =
this.schemaRegistry.getLatestSchemaByTopic(topicName);
+
+ this.schemaCache.put(schemaIdentifier, "");
+
+ // latest schema changed, so inject a metadata update control message
+ if (!latestSchema.equals(this.latestSchema)) {
+ // update the metadata in this injector since the control message is
only applied downstream
+ this.globalMetadata =
GlobalMetadata.builderWithInput(this.globalMetadata,
Optional.of(latestSchema)).build();
+ // update the latestSchema
+ this.latestSchema = latestSchema;
+ // inject a metadata update control message before the record so
that the downstream constructs
+ // are aware of the new schema before processing the record
+ ControlMessage datasetLevelMetadataUpdate = new
MetadataUpdateControlMessage(this.globalMetadata);
+
+ return Collections.singleton(datasetLevelMetadataUpdate);
+ }
+ } catch (SchemaRegistryException e) {
+ throw new RuntimeException("Exception when getting the latest schema
for topic " + topicName, e);
+ }
+ }
+
+ // no schema change detected
+ return null;
+ }
+
+ @Override
+ public Iterable<ControlMessage<DecodeableKafkaRecord>>
injectControlMessagesAfter(
+ RecordEnvelope<DecodeableKafkaRecord> inputRecordEnvelope, WorkUnitState
workUnitState) {
+ return null;
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaSchemaChangeInjectorTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaSchemaChangeInjectorTest.java
new file mode 100644
index 0000000..c73e7a9
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaSchemaChangeInjectorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metadata.GlobalMetadata;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamTestUtils;
+import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.RecordEnvelope;
+
+public class KafkaSchemaChangeInjectorTest {
+ // Test Avro schemas
+ private static final String SCHEMA1 = "{\"namespace\": \"example.avro\",\n" +
+ " \"type\": \"record\",\n" +
+ " \"name\": \"user\",\n" +
+ " \"fields\": [\n" +
+ " {\"name\": \"name\", \"type\": \"string\"},\n" +
+ " {\"name\": \"DUMMY\", \"type\": [\"null\",\"string\"]}\n" +
+ " ]\n" +
+ "}";
+
+ @Test
+ public void testInjection() throws SchemaRegistryException, IOException {
+
+ String datasetName = "topic1";
+
+ class TestInjector extends KafkaSchemaChangeInjector<Schema> {
+
+ @Override
+ protected Schema getSchemaIdentifier(DecodeableKafkaRecord
consumerRecord) {
+ return ((GenericRecord) consumerRecord.getValue()).getSchema();
+ }
+ }
+
+ KafkaSchemaChangeInjector schemaChangeInjector = new TestInjector();
+ WorkUnitState wus = new WorkUnitState();
+ wus.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS,
KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
+ schemaChangeInjector.init(wus);
+
+ Schema schema1 = new Schema.Parser().parse(SCHEMA1);
+ Schema schema2 = new Schema.Parser().parse(SCHEMA1.replace("DUMMY",
"DUMMY2"));
+ Schema schema3 = new Schema.Parser().parse(SCHEMA1.replace("DUMMY",
"DUMMY3"));
+ Schema schema4 = new Schema.Parser().parse(SCHEMA1.replace("DUMMY",
"DUMMY4"));
+
+
schemaChangeInjector.setInputGlobalMetadata(GlobalMetadata.<Schema>builder().schema(schema1).build(),
null);
+
+ schemaChangeInjector.getSchemaRegistry().register(schema1, datasetName);
+
+ DecodeableKafkaRecord record1 = getMock(datasetName, getRecord(schema1,
"name1"));
+ DecodeableKafkaRecord record2 = getMock(datasetName, getRecord(schema2,
"name1"));
+ DecodeableKafkaRecord record3 = getMock(datasetName, getRecord(schema3,
"name1"));
+ DecodeableKafkaRecord record4 = getMock(datasetName, getRecord(schema4,
"name1"));
+
+ // first message will always trigger injection
+ Assert.assertEquals(schemaChangeInjector.getSchemaCache().size(), 0);
+ Assert.assertNotNull(schemaChangeInjector.injectControlMessagesBefore(new
RecordEnvelope<>(record1), wus));
+
+ // next messages should not trigger injection since there is no schema
change
+ Assert.assertNull(schemaChangeInjector.injectControlMessagesBefore(new
RecordEnvelope<>(record1), wus));
+ Assert.assertNull(schemaChangeInjector.injectControlMessagesBefore(new
RecordEnvelope<>(record1), wus));
+ Assert.assertEquals(schemaChangeInjector.getSchemaCache().size(), 1);
+
+ Assert.assertNull(schemaChangeInjector.injectControlMessagesBefore(new
RecordEnvelope<>(record2), wus));
+ Assert.assertEquals(schemaChangeInjector.getSchemaCache().size(), 2);
+
+ // updating the latest schema should result in an injection
+ schemaChangeInjector.getSchemaRegistry().register(schema4, datasetName);
+
+ Iterable<ControlMessage<DecodeableKafkaRecord>> iterable =
+ schemaChangeInjector.injectControlMessagesBefore(new
RecordEnvelope<>(record3), wus);
+
+ Assert.assertNotNull(iterable);
+
+ List<ControlMessage<DecodeableKafkaRecord>> controlMessages =
Lists.newArrayList(iterable);
+
+ Assert.assertEquals(controlMessages.size(), 1);
+
+ // Should not see any injections since no schema update after the last call
+ Assert.assertNull(schemaChangeInjector.injectControlMessagesBefore(new
RecordEnvelope<>(record4), wus));
+ }
+
+ private DecodeableKafkaRecord getMock(String datasetName, GenericRecord
record) {
+ DecodeableKafkaRecord mockRecord =
Mockito.mock(DecodeableKafkaRecord.class);
+ Mockito.when(mockRecord.getValue()).thenReturn(record);
+ Mockito.when(mockRecord.getTopic()).thenReturn(datasetName);
+ Mockito.when(mockRecord.getPartition()).thenReturn(1);
+ return mockRecord;
+ }
+
+ private GenericRecord getRecord(Schema schema, String name) {
+ GenericRecord record = new GenericData.Record(schema);
+ record.put("name", name);
+
+ return record;
+ }
+}
\ No newline at end of file