b-slim closed pull request #6455: Remove consumer.listTopics() method in case
when too many topics in kafka causes the FullGC in Overlord
URL: https://github.com/apache/incubator-druid/pull/6455
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index b7845cae220..57d6feb27c9 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -767,8 +767,8 @@ void resetInternal(DataSourceMetadata dataSourceMetadata)
// as well as the case where the metadata store do not have an entry
for the reset partitions
boolean doReset = false;
for (Entry<Integer, Long> resetPartitionOffset :
resetKafkaMetadata.getKafkaPartitions()
-
.getPartitionOffsetMap()
-
.entrySet()) {
+
.getPartitionOffsetMap()
+
.entrySet()) {
final Long partitionOffsetInMetadataStore = currentMetadata == null
? null
:
currentMetadata.getKafkaPartitions()
@@ -1034,13 +1034,13 @@ protected void tryInit()
private void updatePartitionDataFromKafka()
{
- Map<String, List<PartitionInfo>> topics;
+ List<PartitionInfo> partitions;
try {
synchronized (consumerLock) {
- topics = consumer.listTopics(); // updates the consumer's list of
partitions from the brokers
+ partitions = consumer.partitionsFor(ioConfig.getTopic());
}
}
- catch (Exception e) { // calls to the consumer throw NPEs when the broker
doesn't respond
+ catch (Exception e) {
log.warn(
e,
"Unable to get partition data from Kafka for brokers [%s], are the
brokers up?",
@@ -1049,10 +1049,6 @@ private void updatePartitionDataFromKafka()
return;
}
- List<PartitionInfo> partitions = topics.get(ioConfig.getTopic());
- if (partitions == null) {
- log.warn("No such topic [%s] found, list of discovered topics [%s]",
ioConfig.getTopic(), topics.keySet());
- }
int numPartitions = (partitions != null ? partitions.size() : 0);
log.debug("Found [%d] Kafka partitions for topic [%s]", numPartitions,
ioConfig.getTopic());
@@ -1101,7 +1097,7 @@ private void discoverTasks() throws ExecutionException,
InterruptedException, Ti
taskCount++;
final KafkaIndexTask kafkaTask = (KafkaIndexTask) task;
final String taskId = task.getId();
-
+
// Determine which task group this task belongs to based on one of the
partitions handled by this task. If we
// later determine that this task is actively reading, we will make sure
that it matches our current partition
// allocation (getTaskGroupIdForPartition(partition) should return the
same value for every partition being read
@@ -2263,16 +2259,17 @@ private Runnable buildRunTask()
private void updateLatestOffsetsFromKafka()
{
synchronized (consumerLock) {
- final Map<String, List<PartitionInfo>> topics = consumer.listTopics();
+ final List<PartitionInfo> partitionInfoList =
consumer.partitionsFor(ioConfig.getTopic());
- if (topics == null || !topics.containsKey(ioConfig.getTopic())) {
+ if (partitionInfoList == null || partitionInfoList.size() == 0) {
throw new ISE("Could not retrieve partitions for topic [%s]",
ioConfig.getTopic());
}
- final Set<TopicPartition> topicPartitions =
topics.get(ioConfig.getTopic())
- .stream()
- .map(x -> new
TopicPartition(x.topic(), x.partition()))
-
.collect(Collectors.toSet());
+ final Set<TopicPartition> topicPartitions = partitionInfoList
+ .stream()
+ .map(x -> new TopicPartition(x.topic(), x.partition()))
+ .collect(Collectors.toSet());
+
consumer.assign(topicPartitions);
consumer.seekToEnd(topicPartitions);
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c4e24f185cc..084696d7ab9 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -26,6 +26,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.utils.ZkUtils;
import org.apache.curator.test.TestingCluster;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -75,6 +78,7 @@
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.security.JaasUtils;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
@@ -101,6 +105,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
@@ -133,6 +138,7 @@
private static String kafkaHost;
private static DataSchema dataSchema;
private static int topicPostfix;
+ private static ZkUtils zkUtils;
private final int numThreads;
@@ -174,12 +180,19 @@ public static void setupClass() throws Exception
zkServer.getConnectString(),
null,
1,
- ImmutableMap.of("num.partitions", String.valueOf(NUM_PARTITIONS))
+ ImmutableMap.of(
+ "num.partitions",
+ String.valueOf(NUM_PARTITIONS),
+ "auto.create.topics.enable",
+ String.valueOf(false)
+ )
);
kafkaServer.start();
kafkaHost = StringUtils.format("localhost:%d", kafkaServer.getPort());
dataSchema = getDataSchema(DATASOURCE);
+
+ zkUtils = ZkUtils.apply(zkServer.getConnectString(), 30000, 30000,
JaasUtils.isZkSecurityEnabled());
}
@Before
@@ -238,6 +251,9 @@ public static void tearDownClass() throws IOException
zkServer.stop();
zkServer = null;
+
+ zkUtils.close();
+ zkUtils = null;
}
@Test
@@ -2200,7 +2216,8 @@ public void testCheckpointForUnknownTaskGroup() throws
InterruptedException
Thread.sleep(100);
}
-
Assert.assertTrue(serviceEmitter.getStackTrace().startsWith("org.apache.druid.java.util.common.ISE:
WTH?! cannot find"));
+ Assert.assertTrue(serviceEmitter.getStackTrace()
+
.startsWith("org.apache.druid.java.util.common.ISE: WTH?! cannot find"));
Assert.assertEquals(
"WTH?! cannot find taskGroup [0] among all taskGroups [{}]",
serviceEmitter.getExceptionMessage()
@@ -2532,6 +2549,9 @@ public void testFailedInitializationAndRecovery() throws
Exception
private void addSomeEvents(int numEventsPerPartition) throws Exception
{
+ //create topic manually
+ AdminUtils.createTopic(zkUtils, topic, NUM_PARTITIONS, 1, new
Properties(), RackAwareMode.Enforced$.MODULE$);
+
try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
for (int i = 0; i < NUM_PARTITIONS; i++) {
for (int j = 0; j < numEventsPerPartition; j++) {
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]