This is an automated email from the ASF dual-hosted git repository.
zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 69d7e0fa4 [GOBBLIN-1876] Kafka source / extractor utility to get a
simple name for kafka brokers (#3738)
69d7e0fa4 is described below
commit 69d7e0fa4df2499934462854514ddc9ddbfe7dc7
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Aug 15 16:54:50 2023 -0700
[GOBBLIN-1876] Kafka source / extractor utility to get a simple name for
kafka brokers (#3738)
* [GOBBLIN-1876] Kafka source / extractor utility to get a simple name for
kafka brokers
* Configuration key was not backward compatible
* Address comments
---
.../gobblin/configuration/ConfigurationKeys.java | 1 +
.../java/org/apache/gobblin/KafkaCommonUtil.java | 24 +++++++++
.../extractor/extract/kafka/KafkaExtractor.java | 23 ++++++++
.../org/apache/gobblin/KafkaCommonUtilTest.java | 48 +++++++++++++++++
.../extract/kafka/KafkaExtractorTest.java | 62 ++++++++++++++++++++++
5 files changed, 158 insertions(+)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 50bd2a03d..d3aba4d23 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -916,6 +916,7 @@ public class ConfigurationKeys {
* Kafka job configurations.
*/
public static final String KAFKA_BROKERS = "kafka.brokers";
+ public static final String KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY =
"kafka.brokersToSimpleNameMap";
public static final String KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS =
"kafka.source.work.units.creation.threads";
public static final int
KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT = 30;
public static final String KAFKA_SOURCE_SHARE_CONSUMER_CLIENT =
"kafka.source.shareConsumerClient";
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
index 2ae059dd6..e85aed1c5 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
@@ -17,6 +17,8 @@
package org.apache.gobblin;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -25,9 +27,18 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+
+import org.apache.gobblin.configuration.State;
+
+import static
org.apache.gobblin.configuration.ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY;
+
public class KafkaCommonUtil {
public static final long KAFKA_FLUSH_TIMEOUT_SECONDS = 15L;
+ public static final String MAP_KEY_VALUE_DELIMITER_KEY = "->";
+ public static final Splitter LIST_SPLITTER =
Splitter.on(",").trimResults().omitEmptyStrings();
public static void runWithTimeout(final Runnable runnable, long timeout,
TimeUnit timeUnit) throws Exception {
runWithTimeout(() -> {
@@ -59,4 +70,17 @@ public class KafkaCommonUtil {
}
}
}
+
+ public static Map<String, String> getKafkaBrokerToSimpleNameMap(State state)
{
+
Preconditions.checkArgument(state.contains(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY),
+ String.format("Configuration must contain value for %s",
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY));
+ String mapStr = state.getProp(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY);
+ Map<String, String> kafkaBrokerUriToSimpleName = new HashMap<>();
+ for (String entry : LIST_SPLITTER.splitToList(mapStr)) {
+ String[] items = entry.trim().split(MAP_KEY_VALUE_DELIMITER_KEY);
+ kafkaBrokerUriToSimpleName.put(items[0], items[1]);
+ }
+
+ return kafkaBrokerUriToSimpleName;
+ }
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 2006a574a..9a29411c2 100644
---
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Maps;
import lombok.Getter;
+import org.apache.gobblin.KafkaCommonUtil;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
@@ -49,6 +50,8 @@ import
org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
+import static
org.apache.gobblin.configuration.ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY;
+
/**
* An implementation of {@link Extractor} for Apache Kafka. Each {@link
KafkaExtractor} processes
@@ -337,4 +340,24 @@ public abstract class KafkaExtractor<S, D> extends
EventBasedExtractor<S, D> {
public long getHighWatermark() {
return 0;
}
+
+ public static String getKafkaBrokerSimpleName(State state) {
+
Preconditions.checkArgument(state.contains(ConfigurationKeys.KAFKA_BROKERS),
String.format("%s is not defined in"
+ + " the configuration.", ConfigurationKeys.KAFKA_BROKERS));
+ List<String> kafkaBrokerUriList =
state.getPropAsList(ConfigurationKeys.KAFKA_BROKERS);
+ Preconditions.checkArgument(kafkaBrokerUriList.size() == 1,
+ String.format("The %s only supports having exactly one kafka broker
defined for %s. "
+ + "This is partially because the watermark implementation
(e.g. %s class) does not have a schema that supports writing watermarks that
contains offsets "
+ + "from multiple brokers in a single job",
KafkaExtractor.class.getSimpleName(),
+ ConfigurationKeys.KAFKA_BROKERS,
KafkaStreamingExtractor.KafkaWatermark.class.getName()));
+
+ String brokerUri = kafkaBrokerUriList.get(0);
+ Map<String, String> brokerToSimpleName =
KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state);
+
+ Preconditions.checkArgument(brokerToSimpleName.get(brokerUri) != null,
+ String.format("Unable to find simple name for the kafka cluster broker
uri in the config. Please check the map "
+ + "value of %s. brokerUri=%s, configMapValue=%s",
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, brokerUri, brokerToSimpleName));
+
+ return brokerToSimpleName.get(brokerUri);
+ }
}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
new file mode 100644
index 000000000..273e41481
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+
+public class KafkaCommonUtilTest {
+
+ @Test
+ public void testGetKafkaBrokerToSimpleNameMap() {
+ String brokerUri = "kafka.some-identifier.kafka.coloc-123.com:12345";
+ String simpleName = "some-identifier";
+
+ State state = new State();
+ Assert.assertThrows(IllegalArgumentException.class, () ->
KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state));
+
+ state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
String.format("%s->%s", brokerUri, simpleName));
+ Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state),
+ ImmutableMap.of(brokerUri, simpleName));
+
+ state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
+ String.format("foobar.com:12345->foobar,%s->%s", brokerUri,
simpleName));
+ Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state),
+ ImmutableMap.of(brokerUri, simpleName, "foobar.com:12345", "foobar"));
+ }
+}
diff --git
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
new file mode 100644
index 000000000..7f1b27e58
--- /dev/null
+++
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+
+public class KafkaExtractorTest {
+
+ @Test
+ public void testGetKafkaBrokerSimpleName() {
+ State state = new State();
+ Assert.assertThrows(IllegalArgumentException.class, () ->
KafkaExtractor.getKafkaBrokerSimpleName(state));
+ state.setProp(ConfigurationKeys.KAFKA_BROKERS, "");
+ Assert.assertThrows(IllegalArgumentException.class, () ->
KafkaExtractor.getKafkaBrokerSimpleName(state));
+
+ final String kafkaBrokerUri = "kafka.broker.uri.com:12345";
+ final String kafkaBrokerSimpleName = "simple.kafka.name";
+ state.setProp(ConfigurationKeys.KAFKA_BROKERS, kafkaBrokerUri);
+ Assert.assertThrows(IllegalArgumentException.class, () ->
KafkaExtractor.getKafkaBrokerSimpleName(state));
+
+ state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
String.format("foobar->foobarId", kafkaBrokerUri, kafkaBrokerSimpleName));
+ Assert.assertThrows(IllegalArgumentException.class, () ->
KafkaExtractor.getKafkaBrokerSimpleName(state));
+
+ state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
String.format("%s->%s,foobar->foobarId", kafkaBrokerUri,
kafkaBrokerSimpleName));
+ Assert.assertEquals(KafkaExtractor.getKafkaBrokerSimpleName(state),
kafkaBrokerSimpleName);
+ }
+
+ @Test
+ public void testSimpleMapKeyIsBackwardCompatible() {
+ Config cfg = ConfigFactory.empty()
+ .withValue(ConfigurationKeys.KAFKA_BROKERS,
ConfigValueFactory.fromAnyRef("kafkaBrokerUri"))
+ .withValue(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
+ ConfigValueFactory.fromAnyRef("kafkaBrokerUri->simpleName"));
+
+ Assert.assertEquals(cfg.getString(ConfigurationKeys.KAFKA_BROKERS),
"kafkaBrokerUri");
+
Assert.assertEquals(cfg.getString(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY),
"kafkaBrokerUri->simpleName");
+ }
+}