This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 22e984ba7 [GOBBLIN-1894] Add ability to filter topics with a period in 
kafka source (#3758)
22e984ba7 is described below

commit 22e984ba7afd0b144a599725b4fb0a19cb718bde
Author: Matthew Ho <[email protected]>
AuthorDate: Fri Sep 1 17:07:51 2023 -0700

    [GOBBLIN-1894] Add ability to filter topics with a period in kafka source 
(#3758)
    
    * [GOBBLIN-1894] Add ability to filter topics with a period in kafka source
    
    * Add the copyright information :0)
---
 .../extractor/extract/kafka/KafkaSource.java       |   6 +-
 .../source/extractor/extract/kafka/KafkaTopic.java |   5 +
 .../extractor/extract/kafka/KafkaSourceTest.java   | 107 +++++++++++++++++++++
 3 files changed, 117 insertions(+), 1 deletion(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 4454559e3..0bc4c948e 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -115,6 +115,7 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
   public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
   public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME = 
"previousOffsetFetchEpochTime";
+  public static final String ALLOW_PERIOD_IN_TOPIC_NAME = 
"gobblin.kafka.allowPeriodInTopicName";
   public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS = 
"gobblin.kafka.consumerClient.class";
   public static final String 
GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
       "gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
@@ -801,7 +802,10 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
   protected List<KafkaTopic> getFilteredTopics(SourceState state) {
     List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state, 
TOPIC_BLACKLIST);
     List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state, 
TOPIC_WHITELIST);
-    return this.kafkaConsumerClient.get().getFilteredTopics(blacklist, 
whitelist);
+    if (!state.getPropAsBoolean(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, true)) 
{
+      blacklist.add(Pattern.compile(".*\\..*"));
+    }
+    return kafkaConsumerClient.get().getFilteredTopics(blacklist, whitelist);
   }
 
   @Override
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
index ffafb54ee..f31ebcb49 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
@@ -23,6 +23,9 @@ import java.util.List;
 import com.google.common.base.Optional;
 import com.google.common.collect.Lists;
 
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
 import org.apache.gobblin.configuration.State;
 
 
@@ -32,6 +35,8 @@ import org.apache.gobblin.configuration.State;
  * @author Ziyang Liu
  *
  */
+@EqualsAndHashCode
+@ToString
 public final class KafkaTopic {
   private final String name;
   private final List<KafkaPartition> partitions;
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
new file mode 100644
index 000000000..9992d4442
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.util.DatasetFilterUtils;
+
+
+public class KafkaSourceTest {
+
+  @Test
+  public void testGetFilteredTopics() {
+    TestKafkaClient testKafkaClient = new TestKafkaClient();
+    List<String> allTopics = Arrays.asList(
+        "Topic1", "topic-v2", "topic3", // allowed
+        "topic-with.period-in_middle", ".topic-with-period-at-start", 
"topicWithPeriodAtEnd.", //period topics
+        "not-allowed-topic");
+    testKafkaClient.testTopics = allTopics;
+
+    SourceState state = new SourceState();
+    state.setProp(KafkaSource.TOPIC_WHITELIST, ".*[Tt]opic.*");
+    state.setProp(KafkaSource.TOPIC_BLACKLIST, "not-allowed.*");
+    Assert.assertEquals(new 
TestKafkaSource(testKafkaClient).getFilteredTopics(state), 
toKafkaTopicList(allTopics.subList(0, 6)));
+
+    state.setProp(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, false);
+    Assert.assertEquals(new 
TestKafkaSource(testKafkaClient).getFilteredTopics(state), 
toKafkaTopicList(allTopics.subList(0, 3)));
+  }
+
+  public List<KafkaTopic> toKafkaTopicList(List<String> topicNames) {
+    return topicNames.stream().map(topicName -> new KafkaTopic(topicName, 
Collections.emptyList())).collect(Collectors.toList());
+  }
+
+  private class TestKafkaClient implements GobblinKafkaConsumerClient {
+    List<String> testTopics;
+
+    @Override
+    public List<KafkaTopic> getFilteredTopics(List<Pattern> blacklist, 
List<Pattern> whitelist) {
+      return toKafkaTopicList(DatasetFilterUtils.filter(testTopics, blacklist, 
whitelist));
+    }
+
+    @Override
+    public long getEarliestOffset(KafkaPartition partition)
+        throws KafkaOffsetRetrievalFailureException {
+      return 0;
+    }
+
+    @Override
+    public long getLatestOffset(KafkaPartition partition)
+        throws KafkaOffsetRetrievalFailureException {
+      return 0;
+    }
+
+    @Override
+    public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition, 
long nextOffset, long maxOffset) {
+      return null;
+    }
+
+    @Override
+    public void close()
+        throws IOException {
+
+    }
+  }
+
+  private class TestKafkaSource<S,D> extends KafkaSource<S,D> {
+
+    public TestKafkaSource(GobblinKafkaConsumerClient client) {
+      kafkaConsumerClient.set(client);
+    }
+
+    @Override
+    public Extractor getExtractor(WorkUnitState state)
+        throws IOException {
+      throw new RuntimeException("Not implemented");
+    }
+  }
+}

Reply via email to