This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4fb83cc  [GOBBLIN-1397] Make schema check configurable
4fb83cc is described below

commit 4fb83cccec15739d4afe54aa288666ef81852156
Author: vbohra <[email protected]>
AuthorDate: Fri Feb 26 14:10:26 2021 -0800

    [GOBBLIN-1397] Make schema check configurable
    
    Closes #3235 from
    vikrambohra/configurableSchemaCheck
---
 .../gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java       | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 7bd669f..9e14539 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -54,6 +54,8 @@ public abstract class AbstractBaseKafkaConsumerClient 
implements GobblinKafkaCon
   private static final int CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES_DEFAULT = 1024;
   public static final String CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE = CONFIG_PREFIX 
+ "socketTimeoutMillis";
   public static final int CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE_DEFAULT = 30000; 
// 30 seconds
+  public static final String CONFIG_ENABLE_SCHEMA_CHECK = CONFIG_PREFIX + 
"enableSchemaCheck";
+  public static final boolean ENABLE_SCHEMA_CHECK_DEFAULT = false;
 
   protected final List<String> brokers;
   protected final int fetchTimeoutMillis;
@@ -61,6 +63,7 @@ public abstract class AbstractBaseKafkaConsumerClient 
implements GobblinKafkaCon
   protected final int socketTimeoutMillis;
   protected final Config config;
   protected Optional<KafkaSchemaRegistry> schemaRegistry;
+  protected final boolean schemaCheckEnabled;
 
   public AbstractBaseKafkaConsumerClient(Config config) {
     this.config = config;
@@ -78,6 +81,7 @@ public abstract class AbstractBaseKafkaConsumerClient 
implements GobblinKafkaCon
     Preconditions.checkArgument((this.fetchTimeoutMillis < 
this.socketTimeoutMillis),
         "Kafka Source configuration error: FetchTimeout " + 
this.fetchTimeoutMillis
             + " must be smaller than SocketTimeout " + 
this.socketTimeoutMillis);
+    this.schemaCheckEnabled = ConfigUtils.getBoolean(config, 
CONFIG_ENABLE_SCHEMA_CHECK, ENABLE_SCHEMA_CHECK_DEFAULT);
   }
 
   @Override
@@ -98,7 +102,7 @@ public abstract class AbstractBaseKafkaConsumerClient 
implements GobblinKafkaCon
   }
 
   private boolean isSchemaPresent(String topic) {
-    if(isSchemaRegistryConfigured()) {
+    if(this.schemaCheckEnabled && isSchemaRegistryConfigured()) {
       try {
         if(this.schemaRegistry.get().getLatestSchemaByTopic(topic) == null) {
           log.warn(String.format("Schema not found for topic %s skipping.", 
topic));

Reply via email to