Repository: incubator-gobblin Updated Branches: refs/heads/master 75f725eda -> 249fe6234
[GOBBLIN-433] Skip schema download for empty workunit Not getting schema for topic partiton if the gap between high and low watermark is 0 because it might be a deleted topic and it is just an empty Workunit. Writing test which cover the fix and fixing the failing tests. Closes #2310 from treff7es/skip_schema_download_for_empty_workunit Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/249fe623 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/249fe623 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/249fe623 Branch: refs/heads/master Commit: 249fe6234558ca7f4f72a6fb80b5604f2224670d Parents: 75f725e Author: treff7es <[email protected]> Authored: Fri May 4 10:59:22 2018 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Fri May 4 10:59:22 2018 -0700 ---------------------------------------------------------------------- .../kafka/KafkaDeserializerExtractor.java | 24 ++++-- .../kafka/KafkaDeserializerExtractorTest.java | 84 +++++++++++++------- 2 files changed, 71 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249fe623/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java index 74eafa8..b4a7858 100644 --- a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractor.java @@ -17,28 +17,29 @@ package org.apache.gobblin.source.extractor.extract.kafka; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaJsonDeserializer; - import java.io.IOException; import java.util.Properties; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Getter; - import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.commons.lang3.reflect.ConstructorUtils; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Enums; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaJsonDeserializer; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Getter; + import org.apache.gobblin.annotation.Alias; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord; @@ -47,7 +48,6 @@ import org.apache.gobblin.metrics.kafka.SchemaRegistryException; import org.apache.gobblin.util.AvroUtils; import org.apache.gobblin.util.PropertiesUtils; - /** * <p> * Extension of {@link KafkaExtractor} that wraps Kafka's {@link Deserializer} API. Kafka's {@link Deserializer} provides @@ -72,6 +72,7 @@ import org.apache.gobblin.util.PropertiesUtils; @Getter(AccessLevel.PACKAGE) @Alias(value = "DESERIALIZER") public class KafkaDeserializerExtractor extends KafkaExtractor<Object, Object> { + private static final Logger LOG = LoggerFactory.getLogger(KafkaDeserializerExtractor.class); public static final String KAFKA_DESERIALIZER_TYPE = "kafka.deserializer.type"; @@ -114,6 +115,13 @@ public class KafkaDeserializerExtractor extends KafkaExtractor<Object, Object> { @Override public Object getSchema() { try { + + LOG.info("Getting schema for {}. Gap: {} HighWaterMark: {}", this.topicName, this.lowWatermark.getGap(this.highWatermark)); + //If HighWatermark equals LowWatermark that might mean the workunit is an empty workunit + if (this.lowWatermark.getGap(this.highWatermark) == 0) { + LOG.info("Not getting schema for {} as the gap between high and low watermark is 0", this.topicName); + return null; + } return this.kafkaSchemaRegistry.getLatestSchemaByTopic(this.topicName); } catch (SchemaRegistryException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/249fe623/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java index cdf7b51..1a273ec 100644 --- a/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java +++ b/gobblin-modules/gobblin-kafka-08/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaDeserializerExtractorTest.java @@ -17,29 +17,11 @@ package org.apache.gobblin.source.extractor.extract.kafka; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; -import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; -import io.confluent.kafka.serializers.KafkaAvroSerializer; -import io.confluent.kafka.serializers.KafkaAvroDeserializer; -import io.confluent.kafka.serializers.KafkaJsonDeserializer; -import io.confluent.kafka.serializers.KafkaJsonSerializer; - import java.io.IOException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import kafka.message.Message; -import kafka.message.MessageAndOffset; -import lombok.AllArgsConstructor; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; - import org.apache.avro.Schema; import org.apache.avro.SchemaBuilder; import org.apache.avro.generic.GenericRecord; @@ -51,6 +33,20 @@ import org.testng.annotations.Test; import com.google.common.base.Optional; +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; +import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; +import io.confluent.kafka.serializers.KafkaAvroDeserializer; +import io.confluent.kafka.serializers.KafkaAvroSerializer; +import io.confluent.kafka.serializers.KafkaJsonDeserializer; +import io.confluent.kafka.serializers.KafkaJsonSerializer; +import kafka.message.Message; +import kafka.message.MessageAndOffset; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; @@ -63,6 +59,12 @@ import org.apache.gobblin.source.extractor.extract.kafka.KafkaDeserializerExtrac import org.apache.gobblin.source.workunit.WorkUnit; import org.apache.gobblin.util.PropertiesUtils; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + @Test(groups = { "gobblin.source.extractor.extract.kafka" }) public class KafkaDeserializerExtractorTest { @@ -76,7 +78,7 @@ public class KafkaDeserializerExtractorTest { @Test public void testDeserializeRecord() throws IOException { - WorkUnitState mockWorkUnitState = getMockWorkUnitState(); + WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L,10L); String testString = "Hello World"; ByteBuffer testStringByteBuffer = ByteBuffer.wrap(testString.getBytes(StandardCharsets.UTF_8)); @@ -96,7 +98,7 @@ public class KafkaDeserializerExtractorTest { @Test public void testBuiltInStringDeserializer() throws ReflectiveOperationException { - WorkUnitState mockWorkUnitState = getMockWorkUnitState(); + WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L); mockWorkUnitState.setProp(KafkaDeserializerExtractor.KAFKA_DESERIALIZER_TYPE, KafkaDeserializerExtractor.Deserializers.STRING.name()); @@ -110,7 +112,7 @@ public class KafkaDeserializerExtractorTest { @Test public void testBuiltInGsonDeserializer() throws ReflectiveOperationException { - WorkUnitState mockWorkUnitState = getMockWorkUnitState(); + WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L); mockWorkUnitState.setProp(KafkaDeserializerExtractor.KAFKA_DESERIALIZER_TYPE, KafkaDeserializerExtractor.Deserializers.GSON.name()); @@ -124,7 +126,7 @@ public class KafkaDeserializerExtractorTest { @Test public void testBuiltInConfluentAvroDeserializer() throws ReflectiveOperationException { - WorkUnitState mockWorkUnitState = getMockWorkUnitState(); + WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L); mockWorkUnitState.setProp(KafkaDeserializerExtractor.KAFKA_DESERIALIZER_TYPE, KafkaDeserializerExtractor.Deserializers.CONFLUENT_AVRO.name()); @@ -146,7 +148,7 @@ public class KafkaDeserializerExtractorTest { @Test public void testCustomDeserializer() throws ReflectiveOperationException { - WorkUnitState mockWorkUnitState = getMockWorkUnitState(); + WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L); mockWorkUnitState .setProp(KafkaDeserializerExtractor.KAFKA_DESERIALIZER_TYPE, KafkaJsonDeserializer.class.getName()); mockWorkUnitState @@ -159,7 +161,8 @@ public class KafkaDeserializerExtractorTest { @Test public void testConfluentAvroDeserializer() throws IOException, RestClientException { - WorkUnitState mockWorkUnitState = getMockWorkUnitState(); + WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L,10L); + mockWorkUnitState.setProp("schema.registry.url", TEST_URL); Schema schema = SchemaBuilder.record(TEST_RECORD_NAME) @@ -190,7 +193,7 @@ public class KafkaDeserializerExtractorTest { @Test public void testConfluentAvroDeserializerForSchemaEvolution() throws IOException, RestClientException, SchemaRegistryException { - WorkUnitState mockWorkUnitState = getMockWorkUnitState(); + WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L); mockWorkUnitState.setProp("schema.registry.url", TEST_URL); Schema schemaV1 = SchemaBuilder.record(TEST_RECORD_NAME) @@ -219,6 +222,7 @@ public class KafkaDeserializerExtractorTest { KafkaDeserializerExtractor kafkaDecoderExtractor = new KafkaDeserializerExtractor(mockWorkUnitState, Optional.fromNullable(Deserializers.CONFLUENT_AVRO), kafkaDecoder, mockKafkaSchemaRegistry); + when(kafkaDecoderExtractor.getSchema()).thenReturn(schemaV2); ByteArrayBasedKafkaRecord mockMessageAndOffset = getMockMessageAndOffset(testGenericRecordByteBuffer); @@ -230,7 +234,7 @@ public class KafkaDeserializerExtractorTest { @Test public void testConfluentJsonDeserializer() throws IOException { - WorkUnitState mockWorkUnitState = getMockWorkUnitState(); + WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 10L); mockWorkUnitState.setProp("json.value.type", KafkaRecord.class.getName()); KafkaRecord testKafkaRecord = new KafkaRecord("Hello World"); @@ -252,10 +256,10 @@ public class KafkaDeserializerExtractorTest { Assert.assertEquals(kafkaDecoderExtractor.decodeRecord(mockMessageAndOffset), testKafkaRecord); } - private WorkUnitState getMockWorkUnitState() { + private WorkUnitState getMockWorkUnitState(Long lowWaterMark, Long highWaterMark) { WorkUnit mockWorkUnit = WorkUnit.createEmpty(); - mockWorkUnit.setWatermarkInterval(new WatermarkInterval(new MultiLongWatermark(new ArrayList<Long>()), - new MultiLongWatermark(new ArrayList<Long>()))); + mockWorkUnit.setWatermarkInterval(new WatermarkInterval(new MultiLongWatermark(new ArrayList<Long>(){{add(lowWaterMark);}}), + new MultiLongWatermark(new ArrayList<Long>(){{add(highWaterMark);}}))); WorkUnitState mockWorkUnitState = new WorkUnitState(mockWorkUnit, new State()); mockWorkUnitState.setProp(KafkaSource.TOPIC_NAME, TEST_TOPIC_NAME); @@ -266,6 +270,28 @@ public class KafkaDeserializerExtractorTest { return mockWorkUnitState; } + @Test + public void testConfluentShouldNotQuerySchemaRegistryWhenTheGapIsZero() + throws IOException, RestClientException, SchemaRegistryException { + WorkUnitState mockWorkUnitState = getMockWorkUnitState(0L, 0L); + mockWorkUnitState.setProp("schema.registry.url", TEST_URL); + + + SchemaRegistryClient mockSchemaRegistryClient = mock(SchemaRegistryClient.class); + + Deserializer<Object> kafkaDecoder = new KafkaAvroDeserializer(mockSchemaRegistryClient); + + KafkaSchemaRegistry<Integer, Schema> mockKafkaSchemaRegistry = mock(KafkaSchemaRegistry.class); + + KafkaDeserializerExtractor kafkaDecoderExtractor = new KafkaDeserializerExtractor(mockWorkUnitState, + Optional.fromNullable(Deserializers.CONFLUENT_AVRO), kafkaDecoder, mockKafkaSchemaRegistry); + + verify(mockKafkaSchemaRegistry, never()).getLatestSchemaByTopic(any()); + + kafkaDecoderExtractor.getSchema(); + + } + private ByteArrayBasedKafkaRecord getMockMessageAndOffset(ByteBuffer payload) { MessageAndOffset mockMessageAndOffset = mock(MessageAndOffset.class); Message mockMessage = mock(Message.class);
