This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 22e984ba7 [GOBBLIN-1894] Add ability to filter topics with a period in
kafka source (#3758)
22e984ba7 is described below
commit 22e984ba7afd0b144a599725b4fb0a19cb718bde
Author: Matthew Ho <[email protected]>
AuthorDate: Fri Sep 1 17:07:51 2023 -0700
[GOBBLIN-1894] Add ability to filter topics with a period in kafka source
(#3758)
* [GOBBLIN-1894] Add ability to filter topics with a period in kafka source
* Add the copyright information :0)
---
.../extractor/extract/kafka/KafkaSource.java | 6 +-
.../source/extractor/extract/kafka/KafkaTopic.java | 5 +
.../extractor/extract/kafka/KafkaSourceTest.java | 107 +++++++++++++++++++++
3 files changed, 117 insertions(+), 1 deletion(-)
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index 4454559e3..0bc4c948e 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -115,6 +115,7 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
public static final String PREVIOUS_LATEST_OFFSET = "previousLatestOffset";
public static final String OFFSET_FETCH_EPOCH_TIME = "offsetFetchEpochTime";
public static final String PREVIOUS_OFFSET_FETCH_EPOCH_TIME =
"previousOffsetFetchEpochTime";
+ public static final String ALLOW_PERIOD_IN_TOPIC_NAME =
"gobblin.kafka.allowPeriodInTopicName";
public static final String GOBBLIN_KAFKA_CONSUMER_CLIENT_FACTORY_CLASS =
"gobblin.kafka.consumerClient.class";
public static final String
GOBBLIN_KAFKA_EXTRACT_ALLOW_TABLE_TYPE_NAMESPACE_CUSTOMIZATION =
"gobblin.kafka.extract.allowTableTypeAndNamspaceCustomization";
@@ -801,7 +802,10 @@ public abstract class KafkaSource<S, D> extends
EventBasedSource<S, D> {
protected List<KafkaTopic> getFilteredTopics(SourceState state) {
List<Pattern> blacklist = DatasetFilterUtils.getPatternList(state,
TOPIC_BLACKLIST);
List<Pattern> whitelist = DatasetFilterUtils.getPatternList(state,
TOPIC_WHITELIST);
- return this.kafkaConsumerClient.get().getFilteredTopics(blacklist,
whitelist);
+ if (!state.getPropAsBoolean(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, true))
{
+ blacklist.add(Pattern.compile(".*\\..*"));
+ }
+ return kafkaConsumerClient.get().getFilteredTopics(blacklist, whitelist);
}
@Override
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
index ffafb54ee..f31ebcb49 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaTopic.java
@@ -23,6 +23,9 @@ import java.util.List;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+
import org.apache.gobblin.configuration.State;
@@ -32,6 +35,8 @@ import org.apache.gobblin.configuration.State;
* @author Ziyang Liu
*
*/
+@EqualsAndHashCode
+@ToString
public final class KafkaTopic {
private final String name;
private final List<KafkaPartition> partitions;
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
new file mode 100644
index 000000000..9992d4442
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSourceTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
+import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
+import org.apache.gobblin.source.extractor.Extractor;
+import org.apache.gobblin.util.DatasetFilterUtils;
+
+
+public class KafkaSourceTest {
+
+ @Test
+ public void testGetFilteredTopics() {
+ TestKafkaClient testKafkaClient = new TestKafkaClient();
+ List<String> allTopics = Arrays.asList(
+ "Topic1", "topic-v2", "topic3", // allowed
+ "topic-with.period-in_middle", ".topic-with-period-at-start",
"topicWithPeriodAtEnd.", //period topics
+ "not-allowed-topic");
+ testKafkaClient.testTopics = allTopics;
+
+ SourceState state = new SourceState();
+ state.setProp(KafkaSource.TOPIC_WHITELIST, ".*[Tt]opic.*");
+ state.setProp(KafkaSource.TOPIC_BLACKLIST, "not-allowed.*");
+ Assert.assertEquals(new
TestKafkaSource(testKafkaClient).getFilteredTopics(state),
toKafkaTopicList(allTopics.subList(0, 6)));
+
+ state.setProp(KafkaSource.ALLOW_PERIOD_IN_TOPIC_NAME, false);
+ Assert.assertEquals(new
TestKafkaSource(testKafkaClient).getFilteredTopics(state),
toKafkaTopicList(allTopics.subList(0, 3)));
+ }
+
+ public List<KafkaTopic> toKafkaTopicList(List<String> topicNames) {
+ return topicNames.stream().map(topicName -> new KafkaTopic(topicName,
Collections.emptyList())).collect(Collectors.toList());
+ }
+
+ private class TestKafkaClient implements GobblinKafkaConsumerClient {
+ List<String> testTopics;
+
+ @Override
+ public List<KafkaTopic> getFilteredTopics(List<Pattern> blacklist,
List<Pattern> whitelist) {
+ return toKafkaTopicList(DatasetFilterUtils.filter(testTopics, blacklist,
whitelist));
+ }
+
+ @Override
+ public long getEarliestOffset(KafkaPartition partition)
+ throws KafkaOffsetRetrievalFailureException {
+ return 0;
+ }
+
+ @Override
+ public long getLatestOffset(KafkaPartition partition)
+ throws KafkaOffsetRetrievalFailureException {
+ return 0;
+ }
+
+ @Override
+ public Iterator<KafkaConsumerRecord> consume(KafkaPartition partition,
long nextOffset, long maxOffset) {
+ return null;
+ }
+
+ @Override
+ public void close()
+ throws IOException {
+
+ }
+ }
+
+ private class TestKafkaSource<S,D> extends KafkaSource<S,D> {
+
+ public TestKafkaSource(GobblinKafkaConsumerClient client) {
+ kafkaConsumerClient.set(client);
+ }
+
+ @Override
+ public Extractor getExtractor(WorkUnitState state)
+ throws IOException {
+ throw new RuntimeException("Not implemented");
+ }
+ }
+}