This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f21bc15  [GOBBLIN-1391] Filter Kafka topics in kafka consumer client 
with No s…
f21bc15 is described below

commit f21bc150393119cb1463b85f2b31204069376a53
Author: vbohra <[email protected]>
AuthorDate: Sun Feb 21 12:49:08 2021 -0800

    [GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
    
    Closes #3231 from
    vikrambohra/schemaRegistryInKafkaSource
---
 .../client/AbstractBaseKafkaConsumerClient.java    | 33 ++++++++++++++++++++--
 1 file changed, 31 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 63d81fe..7bd669f 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.regex.Pattern;
 
+import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
@@ -28,8 +29,11 @@ import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
 
 import javax.annotation.Nonnull;
+import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.DatasetFilterUtils;
@@ -38,6 +42,7 @@ import org.apache.gobblin.util.DatasetFilterUtils;
 /**
  * A base {@link GobblinKafkaConsumerClient} that sets configurations shared 
by all {@link GobblinKafkaConsumerClient}s
  */
+@Slf4j
 public abstract class AbstractBaseKafkaConsumerClient implements 
GobblinKafkaConsumerClient {
 
   public static final String CONFIG_NAMESPACE = "source.kafka";
@@ -54,13 +59,15 @@ public abstract class AbstractBaseKafkaConsumerClient 
implements GobblinKafkaCon
   protected final int fetchTimeoutMillis;
   protected final int fetchMinBytes;
   protected final int socketTimeoutMillis;
+  protected final Config config;
+  protected Optional<KafkaSchemaRegistry> schemaRegistry;
 
   public AbstractBaseKafkaConsumerClient(Config config) {
+    this.config = config;
     this.brokers = ConfigUtils.getStringList(config, 
ConfigurationKeys.KAFKA_BROKERS);
     if (this.brokers.isEmpty()) {
       throw new IllegalArgumentException("Need to specify at least one Kafka 
broker.");
     }
-
     this.socketTimeoutMillis =
         ConfigUtils.getInt(config, CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE, 
CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE_DEFAULT);
     this.fetchTimeoutMillis =
@@ -78,11 +85,33 @@ public abstract class AbstractBaseKafkaConsumerClient 
implements GobblinKafkaCon
     return Lists.newArrayList(Iterables.filter(getTopics(), new 
Predicate<KafkaTopic>() {
       @Override
       public boolean apply(@Nonnull KafkaTopic kafkaTopic) {
-        return DatasetFilterUtils.survived(kafkaTopic.getName(), blacklist, 
whitelist);
+        return DatasetFilterUtils.survived(kafkaTopic.getName(), blacklist, 
whitelist) && isSchemaPresent(kafkaTopic.getName());
       }
     }));
   }
 
+  private boolean isSchemaRegistryConfigured() {
+    if(this.schemaRegistry == null) {
+      this.schemaRegistry = 
(config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) && 
config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL)) ? 
Optional.of(KafkaSchemaRegistry.get(ConfigUtils.configToProperties(this.config)))
 : Optional.absent();
+    }
+    return this.schemaRegistry.isPresent();
+  }
+
+  private boolean isSchemaPresent(String topic) {
+    if(isSchemaRegistryConfigured()) {
+      try {
+        if(this.schemaRegistry.get().getLatestSchemaByTopic(topic) == null) {
+          log.warn(String.format("Schema not found for topic %s skipping.", 
topic));
+          return false;
+        }
+      } catch (SchemaRegistryException e) {
+        log.warn(String.format("Schema not found for topic %s skipping.", 
topic));
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * A helper method that returns the canonical metric name for a kafka 
metric. A typical canonicalized metric name would
    * be of the following format: "{metric-group}_{client-id}_{metric-name}". 
This method is invoked in {@link GobblinKafkaConsumerClient#getMetrics()}

Reply via email to