This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new feee616f738 MINOR: Query before creating the internal remote log
metadata topic (#14755)
feee616f738 is described below
commit feee616f738a059d6de3e5ec32fba6e9c2b23fb9
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Mon Nov 20 14:50:11 2023 +0530
MINOR: Query before creating the internal remote log metadata topic (#14755)
When a node starts (or) restarts, then we send a CREATE_TOPICS request to
the controller to create the internal __remote_log_metadata topic.
Topic creation event is costly and handled by the controller. During
re-balance, the controller can have pending requests in its queue and can lead
to CREATE_TOPICS timeout. Instead of firing the CREATE_TOPICS request when a
node restarts, send a METADATA request (topic describe) which is handled by the
least loaded node before sending a request to create the topic.
Reviewers: Satish Duggana <[email protected]>, Christo Lolov
<[email protected]>
---
.../TopicBasedRemoteLogMetadataManager.java | 53 ++++++++++++++++++----
.../TopicBasedRemoteLogMetadataManagerTest.java | 25 ++++++++++
2 files changed, 68 insertions(+), 10 deletions(-)
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index 9c6a2089db4..938238fae73 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -17,6 +17,7 @@
package org.apache.kafka.server.log.remote.metadata.storage;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.RecordMetadata;
@@ -45,6 +46,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -54,6 +56,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
/**
* This is the {@link RemoteLogMetadataManager} implementation with storage as
an internal topic with name {@link
TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}.
@@ -446,6 +449,20 @@ public class TopicBasedRemoteLogMetadataManager implements
RemoteLogMetadataMana
}
}
+ boolean doesTopicExist(Admin adminClient, String topic) {
+ try {
+ TopicDescription description =
adminClient.describeTopics(Collections.singleton(topic))
+ .topicNameValues()
+ .get(topic)
+ .get();
+ log.info("Topic {} exists. Description: {}", topic, description);
+ return description != null;
+ } catch (ExecutionException | InterruptedException ex) {
+ log.info("Topic {} does not exist. Error: {}", topic,
ex.getCause().getMessage());
+ return false;
+ }
+ }
+
private boolean isPartitionsCountSameAsConfigured(Admin adminClient,
String topicName) throws
InterruptedException, ExecutionException {
log.debug("Getting topic details to check for partition count and
replication factor.");
@@ -467,30 +484,46 @@ public class TopicBasedRemoteLogMetadataManager
implements RemoteLogMetadataMana
Map<String, String> topicConfigs = new HashMap<>();
topicConfigs.put(TopicConfig.RETENTION_MS_CONFIG,
Long.toString(rlmmConfig.metadataTopicRetentionMs()));
topicConfigs.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE);
+ topicConfigs.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG,
"false");
return new NewTopic(rlmmConfig.remoteLogMetadataTopicName(),
rlmmConfig.metadataTopicPartitionsCount(),
rlmmConfig.metadataTopicReplicationFactor()).configs(topicConfigs);
}
/**
- * @param topic topic to be created.
+ * @param newTopic topic to be created.
* @return Returns true if the topic already exists, or it is created
successfully.
*/
- private boolean createTopic(Admin adminClient, NewTopic topic) {
- boolean topicCreated = false;
+ private boolean createTopic(Admin adminClient, NewTopic newTopic) {
+ boolean doesTopicExist = false;
+ String topic = newTopic.name();
try {
- adminClient.createTopics(Collections.singleton(topic)).all().get();
- topicCreated = true;
+ doesTopicExist = doesTopicExist(adminClient, topic);
+ if (!doesTopicExist) {
+ CreateTopicsResult result =
adminClient.createTopics(Collections.singleton(newTopic));
+ result.all().get();
+ List<String> overriddenConfigs = result.config(topic).get()
+ .entries()
+ .stream()
+ .filter(entry -> !entry.isDefault())
+ .map(entry -> entry.name() + "=" + entry.value())
+ .collect(Collectors.toList());
+ log.info("Topic {} created. TopicId: {}, numPartitions: {},
replicationFactor: {}, config: {}",
+ topic, result.topicId(topic).get(),
result.numPartitions(topic).get(),
+ result.replicationFactor(topic).get(),
overriddenConfigs);
+ doesTopicExist = true;
+ }
} catch (Exception e) {
+ // This exception can still occur as multiple brokers may call
create topics and one of them may become
+ // successful and other would throw TopicExistsException
if (e.getCause() instanceof TopicExistsException) {
- log.info("Topic [{}] already exists", topic.name());
- topicCreated = true;
+ log.info("Topic [{}] already exists", topic);
+ doesTopicExist = true;
} else {
- log.error("Encountered error while creating remote log
metadata topic.", e);
+ log.error("Encountered error while creating {} topic.", topic,
e);
}
}
-
- return topicCreated;
+ return doesTopicExist;
}
public boolean isInitialized() {
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index 96e48de8a73..1f2bebbd296 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
@@ -40,6 +42,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.TimeoutException;
@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for
usages of JavaConverters
@@ -66,6 +69,28 @@ public class TopicBasedRemoteLogMetadataManagerTest {
return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
}
+ @Test
+ public void testInternalTopicExists() {
+ Properties adminConfig =
remoteLogMetadataManagerHarness.adminClientConfig();
+ ListenerName listenerName =
remoteLogMetadataManagerHarness.listenerName();
+ try (Admin admin =
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+ String topic =
topicBasedRlmm().config().remoteLogMetadataTopicName();
+ boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin,
topic);
+ Assertions.assertTrue(doesTopicExist);
+ }
+ }
+
+ @Test
+ public void testTopicDoesNotExists() {
+ Properties adminConfig =
remoteLogMetadataManagerHarness.adminClientConfig();
+ ListenerName listenerName =
remoteLogMetadataManagerHarness.listenerName();
+ try (Admin admin =
remoteLogMetadataManagerHarness.createAdminClient(listenerName, adminConfig)) {
+ String topic = "dummy-test-topic";
+ boolean doesTopicExist = topicBasedRlmm().doesTopicExist(admin,
topic);
+ Assertions.assertFalse(doesTopicExist);
+ }
+ }
+
@Test
public void testWithNoAssignedPartitions() throws Exception {
// This test checks simple lifecycle of
TopicBasedRemoteLogMetadataManager with out assigning any leader/follower
partitions.