This is an automated email from the ASF dual-hosted git repository.

zihanli58 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 69d7e0fa4 [GOBBLIN-1876] Kafka source / extractor utility to get a 
simple name for kafka brokers (#3738)
69d7e0fa4 is described below

commit 69d7e0fa4df2499934462854514ddc9ddbfe7dc7
Author: Matthew Ho <[email protected]>
AuthorDate: Tue Aug 15 16:54:50 2023 -0700

    [GOBBLIN-1876] Kafka source / extractor utility to get a simple name for 
kafka brokers (#3738)
    
    * [GOBBLIN-1876] Kafka source / extractor utility to get a simple name for 
kafka brokers
    
    * Configuration key was not backward compatible
    
    * Address comments
---
 .../gobblin/configuration/ConfigurationKeys.java   |  1 +
 .../java/org/apache/gobblin/KafkaCommonUtil.java   | 24 +++++++++
 .../extractor/extract/kafka/KafkaExtractor.java    | 23 ++++++++
 .../org/apache/gobblin/KafkaCommonUtilTest.java    | 48 +++++++++++++++++
 .../extract/kafka/KafkaExtractorTest.java          | 62 ++++++++++++++++++++++
 5 files changed, 158 insertions(+)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 50bd2a03d..d3aba4d23 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -916,6 +916,7 @@ public class ConfigurationKeys {
    * Kafka job configurations.
    */
   public static final String KAFKA_BROKERS = "kafka.brokers";
+  public static final String KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY = 
"kafka.brokersToSimpleNameMap";
   public static final String KAFKA_SOURCE_WORK_UNITS_CREATION_THREADS = 
"kafka.source.work.units.creation.threads";
   public static final int 
KAFKA_SOURCE_WORK_UNITS_CREATION_DEFAULT_THREAD_COUNT = 30;
   public static final String KAFKA_SOURCE_SHARE_CONSUMER_CLIENT = 
"kafka.source.shareConsumerClient";
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
index 2ae059dd6..e85aed1c5 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/KafkaCommonUtil.java
@@ -17,6 +17,8 @@
 
 package org.apache.gobblin;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -25,9 +27,18 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+
+import org.apache.gobblin.configuration.State;
+
+import static 
org.apache.gobblin.configuration.ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY;
+
 
 public class KafkaCommonUtil {
   public static final long KAFKA_FLUSH_TIMEOUT_SECONDS = 15L;
+  public static final String MAP_KEY_VALUE_DELIMITER_KEY = "->";
+  public static final Splitter LIST_SPLITTER = 
Splitter.on(",").trimResults().omitEmptyStrings();
 
   public static void runWithTimeout(final Runnable runnable, long timeout, 
TimeUnit timeUnit) throws Exception {
     runWithTimeout(() -> {
@@ -59,4 +70,17 @@ public class KafkaCommonUtil {
       }
     }
   }
+
+  public static Map<String, String> getKafkaBrokerToSimpleNameMap(State state) 
{
+    
Preconditions.checkArgument(state.contains(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY),
+        String.format("Configuration must contain value for %s", 
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY));
+    String mapStr = state.getProp(KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY);
+    Map<String, String> kafkaBrokerUriToSimpleName = new HashMap<>();
+    for (String entry : LIST_SPLITTER.splitToList(mapStr)) {
+      String[] items = entry.trim().split(MAP_KEY_VALUE_DELIMITER_KEY);
+      kafkaBrokerUriToSimpleName.put(items[0], items[1]);
+    }
+
+    return kafkaBrokerUriToSimpleName;
+  }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 2006a574a..9a29411c2 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -33,6 +33,7 @@ import com.google.common.collect.Maps;
 
 import lombok.Getter;
 
+import org.apache.gobblin.KafkaCommonUtil;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.configuration.WorkUnitState;
@@ -49,6 +50,8 @@ import 
org.apache.gobblin.source.extractor.extract.EventBasedExtractor;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 
+import static 
org.apache.gobblin.configuration.ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY;
+
 
 /**
  * An implementation of {@link Extractor} for Apache Kafka. Each {@link 
KafkaExtractor} processes
@@ -337,4 +340,24 @@ public abstract class KafkaExtractor<S, D> extends 
EventBasedExtractor<S, D> {
   public long getHighWatermark() {
     return 0;
   }
+
+  public static String getKafkaBrokerSimpleName(State state) {
+    
Preconditions.checkArgument(state.contains(ConfigurationKeys.KAFKA_BROKERS), 
String.format("%s is not defined in"
+            + " the configuration.", ConfigurationKeys.KAFKA_BROKERS));
+    List<String> kafkaBrokerUriList = 
state.getPropAsList(ConfigurationKeys.KAFKA_BROKERS);
+    Preconditions.checkArgument(kafkaBrokerUriList.size() == 1,
+        String.format("The %s only supports having exactly one kafka broker 
defined for %s. "
+                + "This is partially because the watermark implementation 
(e.g. %s class) does not have a schema that supports writing watermarks that 
contains offsets "
+                + "from multiple brokers in a single job", 
KafkaExtractor.class.getSimpleName(),
+            ConfigurationKeys.KAFKA_BROKERS, 
KafkaStreamingExtractor.KafkaWatermark.class.getName()));
+
+    String brokerUri = kafkaBrokerUriList.get(0);
+    Map<String, String> brokerToSimpleName = 
KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state);
+
+    Preconditions.checkArgument(brokerToSimpleName.get(brokerUri) != null,
+        String.format("Unable to find simple name for the kafka cluster broker 
uri in the config. Please check the map "
+            + "value of %s. brokerUri=%s, configMapValue=%s", 
KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, brokerUri, brokerToSimpleName));
+
+    return brokerToSimpleName.get(brokerUri);
+  }
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
new file mode 100644
index 000000000..273e41481
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/KafkaCommonUtilTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+
+public class KafkaCommonUtilTest {
+
+  @Test
+  public void testGetKafkaBrokerToSimpleNameMap() {
+    String brokerUri =  "kafka.some-identifier.kafka.coloc-123.com:12345";
+    String simpleName = "some-identifier";
+
+    State state = new State();
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state));
+
+    state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, 
String.format("%s->%s", brokerUri, simpleName));
+    Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state),
+        ImmutableMap.of(brokerUri, simpleName));
+
+    state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
+        String.format("foobar.com:12345->foobar,%s->%s", brokerUri, 
simpleName));
+    Assert.assertEquals(KafkaCommonUtil.getKafkaBrokerToSimpleNameMap(state),
+        ImmutableMap.of(brokerUri, simpleName, "foobar.com:12345", "foobar"));
+  }
+}
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
new file mode 100644
index 000000000..7f1b27e58
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.State;
+
+
+public class KafkaExtractorTest {
+
+  @Test
+  public void testGetKafkaBrokerSimpleName() {
+    State state = new State();
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
KafkaExtractor.getKafkaBrokerSimpleName(state));
+    state.setProp(ConfigurationKeys.KAFKA_BROKERS, "");
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
KafkaExtractor.getKafkaBrokerSimpleName(state));
+
+    final String kafkaBrokerUri = "kafka.broker.uri.com:12345";
+    final String kafkaBrokerSimpleName = "simple.kafka.name";
+    state.setProp(ConfigurationKeys.KAFKA_BROKERS, kafkaBrokerUri);
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
KafkaExtractor.getKafkaBrokerSimpleName(state));
+
+    state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, 
String.format("foobar->foobarId", kafkaBrokerUri, kafkaBrokerSimpleName));
+    Assert.assertThrows(IllegalArgumentException.class, () -> 
KafkaExtractor.getKafkaBrokerSimpleName(state));
+
+    state.setProp(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY, 
String.format("%s->%s,foobar->foobarId", kafkaBrokerUri, 
kafkaBrokerSimpleName));
+    Assert.assertEquals(KafkaExtractor.getKafkaBrokerSimpleName(state), 
kafkaBrokerSimpleName);
+  }
+
+  @Test
+  public void testSimpleMapKeyIsBackwardCompatible() {
+    Config cfg = ConfigFactory.empty()
+        .withValue(ConfigurationKeys.KAFKA_BROKERS, 
ConfigValueFactory.fromAnyRef("kafkaBrokerUri"))
+        .withValue(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY,
+            ConfigValueFactory.fromAnyRef("kafkaBrokerUri->simpleName"));
+
+    Assert.assertEquals(cfg.getString(ConfigurationKeys.KAFKA_BROKERS), 
"kafkaBrokerUri");
+    
Assert.assertEquals(cfg.getString(ConfigurationKeys.KAFKA_BROKERS_TO_SIMPLE_NAME_MAP_KEY),
 "kafkaBrokerUri->simpleName");
+  }
+}

Reply via email to