b-slim closed pull request #6455: Remove consumer.listTopics() method in case 
when too many topics in kafka causes the FullGC in Overlord
URL: https://github.com/apache/incubator-druid/pull/6455
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index b7845cae220..57d6feb27c9 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -767,8 +767,8 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
         // as well as the case where the metadata store do not have an entry 
for the reset partitions
         boolean doReset = false;
         for (Entry<Integer, Long> resetPartitionOffset : 
resetKafkaMetadata.getKafkaPartitions()
-                                                                               
.getPartitionOffsetMap()
-                                                                               
.entrySet()) {
+                                                                           
.getPartitionOffsetMap()
+                                                                           
.entrySet()) {
           final Long partitionOffsetInMetadataStore = currentMetadata == null
                                                       ? null
                                                       : 
currentMetadata.getKafkaPartitions()
@@ -1034,13 +1034,13 @@ protected void tryInit()
 
   private void updatePartitionDataFromKafka()
   {
-    Map<String, List<PartitionInfo>> topics;
+    List<PartitionInfo> partitions;
     try {
       synchronized (consumerLock) {
-        topics = consumer.listTopics(); // updates the consumer's list of 
partitions from the brokers
+        partitions = consumer.partitionsFor(ioConfig.getTopic());
       }
     }
-    catch (Exception e) { // calls to the consumer throw NPEs when the broker 
doesn't respond
+    catch (Exception e) {
       log.warn(
           e,
           "Unable to get partition data from Kafka for brokers [%s], are the 
brokers up?",
@@ -1049,10 +1049,6 @@ private void updatePartitionDataFromKafka()
       return;
     }
 
-    List<PartitionInfo> partitions = topics.get(ioConfig.getTopic());
-    if (partitions == null) {
-      log.warn("No such topic [%s] found, list of discovered topics [%s]", 
ioConfig.getTopic(), topics.keySet());
-    }
     int numPartitions = (partitions != null ? partitions.size() : 0);
 
     log.debug("Found [%d] Kafka partitions for topic [%s]", numPartitions, 
ioConfig.getTopic());
@@ -1101,7 +1097,7 @@ private void discoverTasks() throws ExecutionException, 
InterruptedException, Ti
       taskCount++;
       final KafkaIndexTask kafkaTask = (KafkaIndexTask) task;
       final String taskId = task.getId();
-      
+
       // Determine which task group this task belongs to based on one of the 
partitions handled by this task. If we
       // later determine that this task is actively reading, we will make sure 
that it matches our current partition
       // allocation (getTaskGroupIdForPartition(partition) should return the 
same value for every partition being read
@@ -2263,16 +2259,17 @@ private Runnable buildRunTask()
   private void updateLatestOffsetsFromKafka()
   {
     synchronized (consumerLock) {
-      final Map<String, List<PartitionInfo>> topics = consumer.listTopics();
+      final List<PartitionInfo> partitionInfoList = 
consumer.partitionsFor(ioConfig.getTopic());
 
-      if (topics == null || !topics.containsKey(ioConfig.getTopic())) {
+      if (partitionInfoList == null || partitionInfoList.size() == 0) {
         throw new ISE("Could not retrieve partitions for topic [%s]", 
ioConfig.getTopic());
       }
 
-      final Set<TopicPartition> topicPartitions = 
topics.get(ioConfig.getTopic())
-                                                        .stream()
-                                                        .map(x -> new 
TopicPartition(x.topic(), x.partition()))
-                                                        
.collect(Collectors.toSet());
+      final Set<TopicPartition> topicPartitions = partitionInfoList
+          .stream()
+          .map(x -> new TopicPartition(x.topic(), x.partition()))
+          .collect(Collectors.toSet());
+
       consumer.assign(topicPartitions);
       consumer.seekToEnd(topicPartitions);
 
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c4e24f185cc..084696d7ab9 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -26,6 +26,9 @@
 import com.google.common.collect.ImmutableMap;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZkUtils;
 import org.apache.curator.test.TestingCluster;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -75,6 +78,7 @@
 import org.apache.druid.server.metrics.NoopServiceEmitter;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.security.JaasUtils;
 import org.easymock.Capture;
 import org.easymock.CaptureType;
 import org.easymock.EasyMock;
@@ -101,6 +105,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
@@ -133,6 +138,7 @@
   private static String kafkaHost;
   private static DataSchema dataSchema;
   private static int topicPostfix;
+  private static ZkUtils zkUtils;
 
   private final int numThreads;
 
@@ -174,12 +180,19 @@ public static void setupClass() throws Exception
         zkServer.getConnectString(),
         null,
         1,
-        ImmutableMap.of("num.partitions", String.valueOf(NUM_PARTITIONS))
+        ImmutableMap.of(
+            "num.partitions",
+            String.valueOf(NUM_PARTITIONS),
+            "auto.create.topics.enable",
+            String.valueOf(false)
+        )
     );
     kafkaServer.start();
     kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort());
 
     dataSchema = getDataSchema(DATASOURCE);
+
+    zkUtils = ZkUtils.apply(zkServer.getConnectString(), 30000, 30000, 
JaasUtils.isZkSecurityEnabled());
   }
 
   @Before
@@ -238,6 +251,9 @@ public static void tearDownClass() throws IOException
 
     zkServer.stop();
     zkServer = null;
+
+    zkUtils.close();
+    zkUtils = null;
   }
 
   @Test
@@ -2200,7 +2216,8 @@ public void testCheckpointForUnknownTaskGroup() throws 
InterruptedException
       Thread.sleep(100);
     }
 
-    
Assert.assertTrue(serviceEmitter.getStackTrace().startsWith("org.apache.druid.java.util.common.ISE:
 WTH?! cannot find"));
+    Assert.assertTrue(serviceEmitter.getStackTrace()
+                                    
.startsWith("org.apache.druid.java.util.common.ISE: WTH?! cannot find"));
     Assert.assertEquals(
         "WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
         serviceEmitter.getExceptionMessage()
@@ -2532,6 +2549,9 @@ public void testFailedInitializationAndRecovery() throws 
Exception
 
   private void addSomeEvents(int numEventsPerPartition) throws Exception
   {
+    //create topic manually
+    AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new 
Properties(), RackAwareMode.Enforced$.MODULE$);
+
     try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
       for (int i = 0; i < NUM_PARTITIONS; i++) {
         for (int j = 0; j < numEventsPerPartition; j++) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to