This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 48cfae4 [GOBBLIN-1400] Added comments to schemaCheck methods
48cfae4 is described below
commit 48cfae4ab57c443db3c11b5029e454563ac7bfab
Author: vbohra <[email protected]>
AuthorDate: Mon Mar 1 17:37:30 2021 -0800
[GOBBLIN-1400] Added comments to schemaCheck methods
Closes #3237 from vikrambohra/addComments
---
.../kafka/client/AbstractBaseKafkaConsumerClient.java | 13 +++++++++++++
1 file changed, 13 insertions(+)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
index 9e14539..2a0bc2b 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/kafka/client/AbstractBaseKafkaConsumerClient.java
@@ -84,6 +84,14 @@ public abstract class AbstractBaseKafkaConsumerClient
implements GobblinKafkaCon
this.schemaCheckEnabled = ConfigUtils.getBoolean(config,
CONFIG_ENABLE_SCHEMA_CHECK, ENABLE_SCHEMA_CHECK_DEFAULT);
}
+ /**
+ * Filter topics based on whitelist and blacklist patterns
+ * and if {@link #schemaCheckEnabled}, also filter on whether schema is
present in schema registry
+ * @param blacklist - List of regex patterns that need to be blacklisted
+ * @param whitelist - List of regex patterns that need to be whitelisted
+ *
+ * @return
+ */
@Override
public List<KafkaTopic> getFilteredTopics(final List<Pattern> blacklist,
final List<Pattern> whitelist) {
return Lists.newArrayList(Iterables.filter(getTopics(), new
Predicate<KafkaTopic>() {
@@ -101,6 +109,11 @@ public abstract class AbstractBaseKafkaConsumerClient
implements GobblinKafkaCon
return this.schemaRegistry.isPresent();
}
+ /**
+ * accept topic if {@link #schemaCheckEnabled} and schema registry is
configured
+ * @param topic
+ * @return
+ */
private boolean isSchemaPresent(String topic) {
if(this.schemaCheckEnabled && isSchemaRegistryConfigured()) {
try {