This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4fb83cc [GOBBLIN-1397] Make schema check configurable
4fb83cc is described below
commit 4fb83cccec15739d4afe54aa288666ef81852156
Author: vbohra <[email protected]>
AuthorDate: Fri Feb 26 14:10:26 2021 -0800
[GOBBLIN-1397] Make schema check configurable
Closes #3235 from
vikrambohra/configurableSchemaCheck
---
.../gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 7bd669f..9e14539 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -54,6 +54,8 @@ public abstract class AbstractBaseKafkaConsumerClient
implements GobblinKafkaCon
private static final int CONFIG_KAFKA_FETCH_REQUEST_MIN_BYTES_DEFAULT = 1024;
public static final String CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE = CONFIG_PREFIX
+ "socketTimeoutMillis";
public static final int CONFIG_KAFKA_SOCKET_TIMEOUT_VALUE_DEFAULT = 30000;
// 30 seconds
+ public static final String CONFIG_ENABLE_SCHEMA_CHECK = CONFIG_PREFIX +
"enableSchemaCheck";
+ public static final boolean ENABLE_SCHEMA_CHECK_DEFAULT = false;
protected final List<String> brokers;
protected final int fetchTimeoutMillis;
@@ -61,6 +63,7 @@ public abstract class AbstractBaseKafkaConsumerClient
implements GobblinKafkaCon
protected final int socketTimeoutMillis;
protected final Config config;
protected Optional<KafkaSchemaRegistry> schemaRegistry;
+ protected final boolean schemaCheckEnabled;
public AbstractBaseKafkaConsumerClient(Config config) {
this.config = config;
@@ -78,6 +81,7 @@ public abstract class AbstractBaseKafkaConsumerClient
implements GobblinKafkaCon
Preconditions.checkArgument((this.fetchTimeoutMillis <
this.socketTimeoutMillis),
"Kafka Source configuration error: FetchTimeout " +
this.fetchTimeoutMillis
+ " must be smaller than SocketTimeout " +
this.socketTimeoutMillis);
+ this.schemaCheckEnabled = ConfigUtils.getBoolean(config,
CONFIG_ENABLE_SCHEMA_CHECK, ENABLE_SCHEMA_CHECK_DEFAULT);
}
@Override
@@ -98,7 +102,7 @@ public abstract class AbstractBaseKafkaConsumerClient
implements GobblinKafkaCon
}
private boolean isSchemaPresent(String topic) {
- if(isSchemaRegistryConfigured()) {
+ if(this.schemaCheckEnabled && isSchemaRegistryConfigured()) {
try {
if(this.schemaRegistry.get().getLatestSchemaByTopic(topic) == null) {
log.warn(String.format("Schema not found for topic %s skipping.",
topic));