Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 75f725eda -> 249fe6234


[GOBBLIN-433] Skip schema download for empty workunit

Not getting schema for topic partiton if the gap
between high and low watermark is 0 because it
might be a deleted topic and it is just an empty
Workunit.

Writing test which cover the fix and fixing the
failing tests.

Closes #2310 from
treff7es/skip_schema_download_for_empty_workunit


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/249fe623
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/249fe623
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/249fe623

Branch: refs/heads/master
Commit: 249fe6234558ca7f4f72a6fb80b5604f2224670d
Parents: 75f725e
Author: treff7es <[email protected]>
Authored: Fri May 4 10:59:22 2018 -0700
Committer: Abhishek Tiwari <[email protected]>
Committed: Fri May 4 10:59:22 2018 -0700

----------------------------------------------------------------------
 .../kafka/KafkaDeserializerExtractor.java       | 24 ++++--
 .../kafka/KafkaDeserializerExtractorTest.java   | 84 +++++++++++++-------
 2 files changed, 71 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249fe623/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java
 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java
index 74eafa8..b4a7858 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java
@@ -17,28 +17,29 @@
 
 package org.apache.gobblin.source.extractor.extract.kafka;
 
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import io.confluent.kafka.serializers.KafkaJsonDeserializer;
-
 import java.io.IOException;
 import java.util.Properties;
 
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.lang3.reflect.ConstructorUtils;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Enums;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaJsonDeserializer;
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+
 import org.apache.gobblin.annotation.Alias;
 import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord;
@@ -47,7 +48,6 @@ import 
org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.util.AvroUtils;
 import org.apache.gobblin.util.PropertiesUtils;
 
-
 /**
  * <p>
  *   Extension of {@link KafkaExtractor} that wraps Kafka's {@link 
Deserializer} API. Kafka's {@link Deserializer} provides
@@ -72,6 +72,7 @@ import org.apache.gobblin.util.PropertiesUtils;
 @Getter(AccessLevel.PACKAGE)
 @Alias(value = "DESERIALIZER")
 public class KafkaDeserializerExtractor extends KafkaExtractor<Object, Object> 
{
+  private static final Logger LOG = 
LoggerFactory.getLogger(KafkaDeserializerExtractor.class);
 
   public static final String KAFKA_DESERIALIZER_TYPE = 
"kafka.deserializer.type";
 
@@ -114,6 +115,13 @@ public class KafkaDeserializerExtractor extends 
KafkaExtractor<Object, Object> {
   @Override
   public Object getSchema() {
     try {
+
+      LOG.info("Getting schema for {}. Gap: {} HighWaterMark: {}", 
this.topicName, this.lowWatermark.getGap(this.highWatermark));
+      //If HighWatermark equals LowWatermark that might mean the workunit is 
an empty workunit
+      if (this.lowWatermark.getGap(this.highWatermark) == 0) {
+        LOG.info("Not getting schema for {} as the gap between high and low 
watermark is 0", this.topicName);
+        return null;
+      }
       return this.kafkaSchemaRegistry.getLatestSchemaByTopic(this.topicName);
     } catch (SchemaRegistryException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249fe623/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
 
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
index cdf7b51..1a273ec 100644
--- 
a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
+++ 
b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java
@@ -17,29 +17,11 @@
 
 package org.apache.gobblin.source.extractor.extract.kafka;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
-import 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
-import io.confluent.kafka.serializers.KafkaAvroSerializer;
-import io.confluent.kafka.serializers.KafkaAvroDeserializer;
-import io.confluent.kafka.serializers.KafkaJsonDeserializer;
-import io.confluent.kafka.serializers.KafkaJsonSerializer;
-
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 
-import kafka.message.Message;
-import kafka.message.MessageAndOffset;
-import lombok.AllArgsConstructor;
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.avro.generic.GenericRecord;
@@ -51,6 +33,20 @@ import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
 
+import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
+import 
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
+import io.confluent.kafka.serializers.KafkaAvroDeserializer;
+import io.confluent.kafka.serializers.KafkaAvroSerializer;
+import io.confluent.kafka.serializers.KafkaJsonDeserializer;
+import io.confluent.kafka.serializers.KafkaJsonSerializer;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -63,6 +59,12 @@ import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaDeserializerExtrac
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.PropertiesUtils;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 
 @Test(groups = { "gobblin.source.extractor.extract.kafka" })
 public class KafkaDeserializerExtractorTest {
@@ -76,7 +78,7 @@ public class KafkaDeserializerExtractorTest {
 
   @Test
   public void testDeserializeRecord() throws IOException {
-    WorkUnitState mockWorkUnitState = getMockWorkUnitState();
+    WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L,10L);
 
     String testString = "Hello World";
     ByteBuffer testStringByteBuffer = 
ByteBuffer.wrap(testString.getBytes(StandardCharsets.UTF_8));
@@ -96,7 +98,7 @@ public class KafkaDeserializerExtractorTest {
 
   @Test
   public void testBuiltInStringDeserializer() throws 
ReflectiveOperationException {
-    WorkUnitState mockWorkUnitState = getMockWorkUnitState();
+    WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L);
     
mockWorkUnitState.setProp(KafkaDeserializerExtractor.KAFKA_DESERIALIZER_TYPE,
         KafkaDeserializerExtractor.Deserializers.STRING.name());
 
@@ -110,7 +112,7 @@ public class KafkaDeserializerExtractorTest {
 
   @Test
   public void testBuiltInGsonDeserializer() throws 
ReflectiveOperationException {
-    WorkUnitState mockWorkUnitState = getMockWorkUnitState();
+    WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L);
     
mockWorkUnitState.setProp(KafkaDeserializerExtractor.KAFKA_DESERIALIZER_TYPE,
         KafkaDeserializerExtractor.Deserializers.GSON.name());
 
@@ -124,7 +126,7 @@ public class KafkaDeserializerExtractorTest {
 
   @Test
   public void testBuiltInConfluentAvroDeserializer() throws 
ReflectiveOperationException {
-    WorkUnitState mockWorkUnitState = getMockWorkUnitState();
+    WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L);
     
mockWorkUnitState.setProp(KafkaDeserializerExtractor.KAFKA_DESERIALIZER_TYPE,
         KafkaDeserializerExtractor.Deserializers.CONFLUENT_AVRO.name());
 
@@ -146,7 +148,7 @@ public class KafkaDeserializerExtractorTest {
 
   @Test
   public void testCustomDeserializer() throws ReflectiveOperationException {
-    WorkUnitState mockWorkUnitState = getMockWorkUnitState();
+    WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L);
     mockWorkUnitState
         .setProp(KafkaDeserializerExtractor.KAFKA_DESERIALIZER_TYPE, 
KafkaJsonDeserializer.class.getName());
     mockWorkUnitState
@@ -159,7 +161,8 @@ public class KafkaDeserializerExtractorTest {
 
   @Test
   public void testConfluentAvroDeserializer() throws IOException, 
RestClientException {
-    WorkUnitState mockWorkUnitState = getMockWorkUnitState();
+    WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L,10L);
+
     mockWorkUnitState.setProp("schema.registry.url", TEST_URL);
 
     Schema schema = SchemaBuilder.record(TEST_RECORD_NAME)
@@ -190,7 +193,7 @@ public class KafkaDeserializerExtractorTest {
 
   @Test
   public void testConfluentAvroDeserializerForSchemaEvolution() throws 
IOException, RestClientException, SchemaRegistryException {
-    WorkUnitState mockWorkUnitState = getMockWorkUnitState();
+    WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L);
     mockWorkUnitState.setProp("schema.registry.url", TEST_URL);
 
     Schema schemaV1 = SchemaBuilder.record(TEST_RECORD_NAME)
@@ -219,6 +222,7 @@ public class KafkaDeserializerExtractorTest {
 
     KafkaDeserializerExtractor kafkaDecoderExtractor = new 
KafkaDeserializerExtractor(mockWorkUnitState,
         Optional.fromNullable(Deserializers.CONFLUENT_AVRO), kafkaDecoder, 
mockKafkaSchemaRegistry);
+
     when(kafkaDecoderExtractor.getSchema()).thenReturn(schemaV2);
 
     ByteArrayBasedKafkaRecord mockMessageAndOffset = 
getMockMessageAndOffset(testGenericRecordByteBuffer);
@@ -230,7 +234,7 @@ public class KafkaDeserializerExtractorTest {
 
   @Test
   public void testConfluentJsonDeserializer() throws IOException {
-    WorkUnitState mockWorkUnitState = getMockWorkUnitState();
+    WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L);
     mockWorkUnitState.setProp("json.value.type", KafkaRecord.class.getName());
 
     KafkaRecord testKafkaRecord = new KafkaRecord("Hello World");
@@ -252,10 +256,10 @@ public class KafkaDeserializerExtractorTest {
     
Assert.assertEquals(kafkaDecoderExtractor.decodeRecord(mockMessageAndOffset), 
testKafkaRecord);
   }
 
-  private WorkUnitState getMockWorkUnitState() {
+  private WorkUnitState getMockWorkUnitState(Long lowWaterMark, Long 
highWaterMark) {
     WorkUnit mockWorkUnit = WorkUnit.createEmpty();
-    mockWorkUnit.setWatermarkInterval(new WatermarkInterval(new 
MultiLongWatermark(new ArrayList<Long>()),
-        new MultiLongWatermark(new ArrayList<Long>())));
+    mockWorkUnit.setWatermarkInterval(new WatermarkInterval(new 
MultiLongWatermark(new ArrayList<Long>(){{add(lowWaterMark);}}),
+        new MultiLongWatermark(new ArrayList<Long>(){{add(highWaterMark);}})));
 
     WorkUnitState mockWorkUnitState = new WorkUnitState(mockWorkUnit, new 
State());
     mockWorkUnitState.setProp(KafkaSource.TOPIC_NAME, TEST_TOPIC_NAME);
@@ -266,6 +270,28 @@ public class KafkaDeserializerExtractorTest {
     return mockWorkUnitState;
   }
 
+  @Test
+  public void testConfluentShouldNotQuerySchemaRegistryWhenTheGapIsZero()
+      throws IOException, RestClientException, SchemaRegistryException {
+    WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 0L);
+    mockWorkUnitState.setProp("schema.registry.url", TEST_URL);
+
+
+    SchemaRegistryClient mockSchemaRegistryClient = 
mock(SchemaRegistryClient.class);
+
+    Deserializer<Object> kafkaDecoder = new 
KafkaAvroDeserializer(mockSchemaRegistryClient);
+
+    KafkaSchemaRegistry<Integer, Schema> mockKafkaSchemaRegistry = 
mock(KafkaSchemaRegistry.class);
+
+    KafkaDeserializerExtractor kafkaDecoderExtractor = new 
KafkaDeserializerExtractor(mockWorkUnitState,
+        Optional.fromNullable(Deserializers.CONFLUENT_AVRO), kafkaDecoder, 
mockKafkaSchemaRegistry);
+
+    verify(mockKafkaSchemaRegistry, never()).getLatestSchemaByTopic(any());
+
+    kafkaDecoderExtractor.getSchema();
+
+  }
+
   private ByteArrayBasedKafkaRecord getMockMessageAndOffset(ByteBuffer 
payload) {
     MessageAndOffset mockMessageAndOffset = mock(MessageAndOffset.class);
     Message mockMessage = mock(Message.class);

Reply via email to