This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2488c3f  [GOBBLIN-1449] Correctly parse topic name from topic 
partition string if topic name has hyphens
2488c3f is described below

commit 2488c3fa3e38e32e339162ac472f2e8ad035c6c4
Author: suvasude <[email protected]>
AuthorDate: Mon May 24 17:21:42 2021 -0700

    [GOBBLIN-1449] Correctly parse topic name from topic partition string if 
topic name has hyphens
    
    Closes #3287 from sv2000/topicNameFix
---
 .../source/extractor/extract/kafka/KafkaUtils.java | 15 +++++++++++
 .../packer/KafkaTopicGroupingWorkUnitPacker.java   | 11 +++++---
 .../extractor/extract/kafka/KafkaUtilsTest.java    | 30 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
index f5b52b7..4dc41ef 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtils.java
@@ -23,7 +23,9 @@ import org.apache.gobblin.configuration.WorkUnitState;
 
 import java.util.List;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
 import com.google.common.collect.Lists;
 
 import lombok.extern.slf4j.Slf4j;
@@ -35,6 +37,8 @@ import lombok.extern.slf4j.Slf4j;
 @Slf4j
 public class KafkaUtils {
 
+  private static final String TOPIC_PARTITION_DELIMITER = "-";
+
   /**
    * Get topic name from a {@link State} object. The {@link State} should 
contain property
    * {@link KafkaSource#TOPIC_NAME}.
@@ -182,4 +186,15 @@ public class KafkaUtils {
     return Long.parseLong(workUnitState.contains(key) ? 
workUnitState.getProp(key)
         : workUnitState.getProp(KafkaUtils.getPartitionPropName(key, 
partitionId), "0"));
   }
+
+  /**
+   * Get topic name from a topic partition
+   * @param topicPartition
+   */
+  public static String getTopicNameFromTopicPartition(String topicPartition) {
+    
Preconditions.checkArgument(topicPartition.contains(TOPIC_PARTITION_DELIMITER));
+    List<String> parts = 
Splitter.on(TOPIC_PARTITION_DELIMITER).splitToList(topicPartition);
+    return Joiner.on(TOPIC_PARTITION_DELIMITER).join(parts.subList(0, 
parts.size() - 1));
+  }
+
 }
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
index 17a7089..f01b648 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.gson.Gson;
@@ -46,6 +45,7 @@ import 
org.apache.gobblin.source.extractor.extract.AbstractSource;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition;
 import org.apache.gobblin.source.extractor.extract.kafka.KafkaSource;
 import 
org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamingExtractor;
+import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.util.io.GsonInterfaceAdapter;
@@ -70,7 +70,6 @@ import static 
org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
 public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
   public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
   private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
-  private static final String TOPIC_PARTITION_DELIMITER = "-";
 
   //A global configuration for container capacity. The container capacity 
refers to the peak rate (in MB/s) that a
   //single JVM can consume from Kafka for a single topic and controls the 
number of partitions of a topic that will be
@@ -224,6 +223,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
     return squeezedGroups;
   }
 
+
   /**
    * TODO: This method should be moved into {@link KafkaSource}, which 
requires moving classes such
    * as {@link KafkaStreamingExtractor.KafkaWatermark} to the open source. A 
side-effect of this method is to
@@ -243,7 +243,7 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
           GSON.fromJson(state.getProp(topicPartition), 
KafkaStreamingExtractor.KafkaWatermark.class);
       lastCommittedWatermarks.put(topicPartition, watermark);
       if (this.isPerTopicContainerCapacityEnabled) {
-        String topicName = topicPartition.split(TOPIC_PARTITION_DELIMITER)[0];
+        String topicName = 
KafkaUtils.getTopicNameFromTopicPartition(topicPartition);
         List<Double> capacities = capacitiesByTopic.getOrDefault(topicName, 
Lists.newArrayList());
         capacities.add(watermark.getAvgConsumeRate() > 0 ? 
watermark.getAvgConsumeRate() : DEFAULT_CONTAINER_CAPACITY);
         capacitiesByTopic.put(topicName, capacities);
@@ -338,7 +338,10 @@ public class KafkaTopicGroupingWorkUnitPacker extends 
KafkaWorkUnitPacker {
    */
   @VisibleForTesting
   static double getContainerCapacityForTopic(List<Double> capacities, 
ContainerCapacityComputationStrategy strategy) {
-    Preconditions.checkArgument(capacities.size() > 0, "Capacities size must 
be > 0");
+    //No previous stats for a topic? Return default.
+    if (capacities == null) {
+      return DEFAULT_CONTAINER_CAPACITY;
+    }
     Collections.sort(capacities);
     log.info("Capacity computation strategy: {}, capacities: {}", 
strategy.name(), capacities);
     switch (strategy) {
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtilsTest.java
 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtilsTest.java
new file mode 100644
index 0000000..3683b8b
--- /dev/null
+++ 
b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaUtilsTest.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.source.extractor.extract.kafka;
+
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class KafkaUtilsTest {
+
+  @Test
+  public void testGetTopicNameFromTopicPartition() {
+    Assert.assertEquals(KafkaUtils.getTopicNameFromTopicPartition("topic-1"), 
"topic");
+    
Assert.assertEquals(KafkaUtils.getTopicNameFromTopicPartition("topic-foo-bar-1"),
 "topic-foo-bar");
+  }
+}
\ No newline at end of file

Reply via email to