This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f21bc15 [GOBBLIN-1391] Filter Kafka topics in kafka consumer client
with No s…
f21bc15 is described below
commit f21bc150393119cb1463b85f2b31204069376a53
Author: vbohra <[email protected]>
AuthorDate: Sun Feb 21 12:49:08 2021 -0800
[GOBBLIN-1391] Filter Kafka topics in kafka consumer client with No s…
Closes #3231 from
vikrambohra/schemaRegistryInKafkaSource
---
.../client/AbstractBaseKafkaConsumerClient.java | 33 ++++++++++++++++++++--
1 file changed, 31 insertions(+), 2 deletions(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 63d81fe..7bd669f 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.List;
import java.util.regex.Pattern;
+import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@ -28,8 +29,11 @@ import com.google.common.collect.Lists;
import com.typesafe.config.Config;
import javax.annotation.Nonnull;
+import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
+import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.DatasetFilterUtils;
@@ -38,6 +42,7 @@ import org.apache.gobblin.util.DatasetFilterUtils;
/**
* A base {@link GobblinKafkaConsumerClient} that sets configurations shared
by all {@link GobblinKafkaConsumerClient}s
*/
+@Slf4j
public abstract class AbstractBaseKafkaConsumerClient implements
GobblinKafkaConsumerClient {
public static final String CONFIG_NAMESPACE = "source.kafka";
@@ -54,13 +59,15 @@ public abstract class AbstractBaseKafkaConsumerClient
implements GobblinKafkaCon
protected final int fetchTimeoutMillis;
protected final int fetchMinBytes;
protected final int socketTimeoutMillis;
+ protected final Config config;
+ protected Optional<KafkaSchemaRegistry> schemaRegistry;
public AbstractBaseKafkaConsumerClient(Config config) {
+ this.config = config;
this.brokers = ConfigUtils.getStringList(config,
ConfigurationKeys.KAFKA_BROKERS);
if (this.brokers.isEmpty()) {
throw new IllegalArgumentException("Need to specify at least one Kafka
broker.");
}
-
this.socketTimeoutMillis =
ConfigUtils.getInt(config, CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE,
CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE_DEFAULT);
this.fetchTimeoutMillis =
@@ -78,11 +85,33 @@ public abstract class AbstractBaseKafkaConsumerClient
implements GobblinKafkaCon
return Lists.newArrayList(Iterables.filter(getTopics(), new
Predicate<KafkaTopic>() {
@Override
public boolean apply(@Nonnull KafkaTopic kafkaTopic) {
- return DatasetFilterUtils.survived(kafkaTopic.getName(), blacklist,
whitelist);
+ return DatasetFilterUtils.survived(kafkaTopic.getName(), blacklist,
whitelist) && isSchemaPresent(kafkaTopic.getName());
}
}));
}
+ private boolean isSchemaRegistryConfigured() {
+ if(this.schemaRegistry == null) {
+ this.schemaRegistry =
(config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) &&
config.hasPath(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_URL)) ?
Optional.of(KafkaSchemaRegistry.get(ConfigUtils.configToProperties(this.config)))
: Optional.absent();
+ }
+ return this.schemaRegistry.isPresent();
+ }
+
+ private boolean isSchemaPresent(String topic) {
+ if(isSchemaRegistryConfigured()) {
+ try {
+ if(this.schemaRegistry.get().getLatestSchemaByTopic(topic) == null) {
+ log.warn(String.format("Schema not found for topic %s skipping.",
topic));
+ return false;
+ }
+ } catch (SchemaRegistryException e) {
+ log.warn(String.format("Schema not found for topic %s skipping.",
topic));
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* A helper method that returns the canonical metric name for a kafka
metric. A typical canonicalized metric name would
* be of the following format: "{metric-group}_{client-id}_{metric-name}".
This method is invoked in {@link GobblinKafkaConsumerClient#getMetrics()}