This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 78da5dd  [GOBBLIN-1345] Implement a Kafka schema change detecting 
converter
78da5dd is described below

commit 78da5dd29b4e5e400c21dd78b6044841372771e5
Author: suvasude <[email protected]>
AuthorDate: Wed Jan 6 09:16:05 2021 -0800

    [GOBBLIN-1345] Implement a Kafka schema change detecting converter
    
    Closes #3182 from sv2000/kafkaSchemaChangeInjector
---
 ...enericRecordBasedKafkaSchemaChangeInjector.java |  36 ++++++
 .../converter/KafkaSchemaChangeInjector.java       | 125 +++++++++++++++++++++
 .../converter/KafkaSchemaChangeInjectorTest.java   | 125 +++++++++++++++++++++
 3 files changed, 286 insertions(+)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/GenericRecordBasedKafkaSchemaChangeInjector.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/GenericRecordBasedKafkaSchemaChangeInjector.java
new file mode 100644
index 0000000..280b771
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/GenericRecordBasedKafkaSchemaChangeInjector.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+
+
+/**
+ * A {@link org.apache.gobblin.stream.ControlMessageInjector} that detects 
changes in the latest schema and notifies downstream constructs by
+ * injecting a {@link org.apache.gobblin.stream.MetadataUpdateControlMessage}.
+ * Also supports multi-dataset schema changes.
+ */
+public class GenericRecordBasedKafkaSchemaChangeInjector extends 
KafkaSchemaChangeInjector<Schema> {
+  @Override
+  protected Schema getSchemaIdentifier(DecodeableKafkaRecord consumerRecord) {
+    GenericRecord genericRecord = (GenericRecord) consumerRecord.getValue();
+    return genericRecord.getSchema();
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/KafkaSchemaChangeInjector.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/KafkaSchemaChangeInjector.java
new file mode 100644
index 0000000..42bf428
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/converter/KafkaSchemaChangeInjector.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter;
+
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.avro.Schema;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Optional;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import lombok.AccessLevel;
+import lombok.Getter;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metadata.GlobalMetadata;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.ControlMessageInjector;
+import org.apache.gobblin.stream.MetadataUpdateControlMessage;
+import org.apache.gobblin.stream.RecordEnvelope;
+
+
+/**
+ * A {@link org.apache.gobblin.stream.MetadataUpdateControlMessage} that 
detects changes in the latest schema and notifies downstream constructs by
+ * injecting a {@link org.apache.gobblin.stream.MetadataUpdateControlMessage}.
+ */
+public abstract class KafkaSchemaChangeInjector<S>
+    extends ControlMessageInjector<Schema, DecodeableKafkaRecord> {
+  @VisibleForTesting
+  @Getter(AccessLevel.PACKAGE)
+  private KafkaSchemaRegistry<String, Schema> schemaRegistry;
+
+  @VisibleForTesting
+  @Getter(AccessLevel.PACKAGE)
+  private Cache<S, String> schemaCache;
+  private Schema latestSchema;
+  private GlobalMetadata<Schema> globalMetadata;
+
+  // classes that extend this need to implement getSchemaIdentifier
+  protected abstract S getSchemaIdentifier(DecodeableKafkaRecord 
consumerRecord);
+
+  @Override
+  public ControlMessageInjector<Schema, DecodeableKafkaRecord> 
init(WorkUnitState workUnitState) {
+    this.schemaRegistry = 
KafkaSchemaRegistry.get(workUnitState.getProperties());
+    this.schemaCache = CacheBuilder.newBuilder().expireAfterAccess(1, 
TimeUnit.HOURS).build();
+    return this;
+  }
+
+  @Override
+  public void setInputGlobalMetadata(GlobalMetadata<Schema> 
inputGlobalMetadata, WorkUnitState workUnitState) {
+    this.globalMetadata = inputGlobalMetadata;
+  }
+
+  /**
+   * Inject a {@link org.apache.gobblin.stream.MetadataUpdateControlMessage} 
if the latest schema has changed. Check whether there is a new latest
+   * schema if the input record's schema is not present in the schema cache.
+   *
+   * @param inputRecordEnvelope input record envelope
+   * @param workUnitState work unit state
+   * @return the injected messages
+   */
+  @Override
+  public Iterable<ControlMessage<DecodeableKafkaRecord>> 
injectControlMessagesBefore(
+      RecordEnvelope<DecodeableKafkaRecord> inputRecordEnvelope, WorkUnitState 
workUnitState) {
+    DecodeableKafkaRecord consumerRecord = inputRecordEnvelope.getRecord();
+    S schemaIdentifier = getSchemaIdentifier(consumerRecord);
+    String topicName = consumerRecord.getTopic();
+
+    // If a new schema is seen then check the latest schema in the registry 
has changed.
+    // Only check for the latest schema when a new schema is seen since the 
call to get the latest schema is not
+    // cacheable and is expensive.
+    if (this.schemaCache.getIfPresent(schemaIdentifier) == null) {
+      try {
+        Schema latestSchema = 
this.schemaRegistry.getLatestSchemaByTopic(topicName);
+
+        this.schemaCache.put(schemaIdentifier, "");
+
+        // latest schema changed, so inject a metadata update control message
+        if (!latestSchema.equals(this.latestSchema)) {
+          // update the metadata in this injector since the control message is 
only applied downstream
+          this.globalMetadata = 
GlobalMetadata.builderWithInput(this.globalMetadata, 
Optional.of(latestSchema)).build();
+          // update the latestSchema
+          this.latestSchema = latestSchema;
+          // inject a metadata update control message before the record so 
that the downstream constructs
+          // are aware of the new schema before processing the record
+          ControlMessage datasetLevelMetadataUpdate = new 
MetadataUpdateControlMessage(this.globalMetadata);
+
+          return Collections.singleton(datasetLevelMetadataUpdate);
+        }
+      } catch (SchemaRegistryException e) {
+        throw new RuntimeException("Exception when getting the latest schema 
for topic " + topicName, e);
+      }
+    }
+
+    // no schema change detected
+    return null;
+  }
+
+  @Override
+  public Iterable<ControlMessage<DecodeableKafkaRecord>> 
injectControlMessagesAfter(
+      RecordEnvelope<DecodeableKafkaRecord> inputRecordEnvelope, WorkUnitState 
workUnitState) {
+    return null;
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaSchemaChangeInjectorTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaSchemaChangeInjectorTest.java
new file mode 100644
index 0000000..c73e7a9
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/converter/KafkaSchemaChangeInjectorTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.converter;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.mockito.Mockito;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.Lists;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
+import org.apache.gobblin.metadata.GlobalMetadata;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamTestUtils;
+import org.apache.gobblin.stream.ControlMessage;
+import org.apache.gobblin.stream.RecordEnvelope;
+
+public class KafkaSchemaChangeInjectorTest {
+  // Test Avro schemas
+  private static final String SCHEMA1 = "{\"namespace\": \"example.avro\",\n" +
+      " \"type\": \"record\",\n" +
+      " \"name\": \"user\",\n" +
+      " \"fields\": [\n" +
+      "     {\"name\": \"name\", \"type\": \"string\"},\n" +
+      "     {\"name\": \"DUMMY\", \"type\": [\"null\",\"string\"]}\n" +
+      " ]\n" +
+      "}";
+
+  @Test
+  public void testInjection() throws SchemaRegistryException, IOException {
+
+    String datasetName = "topic1";
+
+    class TestInjector extends KafkaSchemaChangeInjector<Schema> {
+
+      @Override
+      protected Schema getSchemaIdentifier(DecodeableKafkaRecord 
consumerRecord) {
+        return ((GenericRecord) consumerRecord.getValue()).getSchema();
+      }
+    }
+
+    KafkaSchemaChangeInjector schemaChangeInjector = new TestInjector();
+    WorkUnitState wus = new WorkUnitState();
+    wus.setProp(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS, 
KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
+    schemaChangeInjector.init(wus);
+
+    Schema schema1 = new Schema.Parser().parse(SCHEMA1);
+    Schema schema2 = new Schema.Parser().parse(SCHEMA1.replace("DUMMY", 
"DUMMY2"));
+    Schema schema3 = new Schema.Parser().parse(SCHEMA1.replace("DUMMY", 
"DUMMY3"));
+    Schema schema4 = new Schema.Parser().parse(SCHEMA1.replace("DUMMY", 
"DUMMY4"));
+
+    
schemaChangeInjector.setInputGlobalMetadata(GlobalMetadata.<Schema>builder().schema(schema1).build(),
 null);
+
+    schemaChangeInjector.getSchemaRegistry().register(schema1, datasetName);
+
+    DecodeableKafkaRecord record1 = getMock(datasetName, getRecord(schema1, 
"name1"));
+    DecodeableKafkaRecord record2 = getMock(datasetName, getRecord(schema2, 
"name1"));
+    DecodeableKafkaRecord record3 = getMock(datasetName, getRecord(schema3, 
"name1"));
+    DecodeableKafkaRecord record4 = getMock(datasetName, getRecord(schema4, 
"name1"));
+
+    // first message will always trigger injection
+    Assert.assertEquals(schemaChangeInjector.getSchemaCache().size(), 0);
+    Assert.assertNotNull(schemaChangeInjector.injectControlMessagesBefore(new 
RecordEnvelope<>(record1), wus));
+
+    // next messages should not trigger injection since there is no schema 
change
+    Assert.assertNull(schemaChangeInjector.injectControlMessagesBefore(new 
RecordEnvelope<>(record1), wus));
+    Assert.assertNull(schemaChangeInjector.injectControlMessagesBefore(new 
RecordEnvelope<>(record1), wus));
+    Assert.assertEquals(schemaChangeInjector.getSchemaCache().size(), 1);
+
+    Assert.assertNull(schemaChangeInjector.injectControlMessagesBefore(new 
RecordEnvelope<>(record2), wus));
+    Assert.assertEquals(schemaChangeInjector.getSchemaCache().size(), 2);
+
+    // updating the latest schema should result in an injection
+    schemaChangeInjector.getSchemaRegistry().register(schema4, datasetName);
+
+    Iterable<ControlMessage<DecodeableKafkaRecord>> iterable =
+        schemaChangeInjector.injectControlMessagesBefore(new 
RecordEnvelope<>(record3), wus);
+
+    Assert.assertNotNull(iterable);
+
+    List<ControlMessage<DecodeableKafkaRecord>> controlMessages = 
Lists.newArrayList(iterable);
+
+    Assert.assertEquals(controlMessages.size(), 1);
+
+    // Should not see any injections since no schema update after the last call
+    Assert.assertNull(schemaChangeInjector.injectControlMessagesBefore(new 
RecordEnvelope<>(record4), wus));
+  }
+
+  private DecodeableKafkaRecord getMock(String datasetName, GenericRecord 
record) {
+    DecodeableKafkaRecord mockRecord = 
Mockito.mock(DecodeableKafkaRecord.class);
+    Mockito.when(mockRecord.getValue()).thenReturn(record);
+    Mockito.when(mockRecord.getTopic()).thenReturn(datasetName);
+    Mockito.when(mockRecord.getPartition()).thenReturn(1);
+    return mockRecord;
+  }
+
+  private GenericRecord getRecord(Schema schema, String name) {
+    GenericRecord record = new GenericData.Record(schema);
+    record.put("name", name);
+
+    return record;
+  }
+}
\ No newline at end of file

Reply via email to