This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 333bd7c KAFKA-12340: Fix potential resource leak in
Kafka*BackingStore (#10153)
333bd7c is described below
commit 333bd7ce9fefd22647f7f2729cd528894e3e8bd9
Author: Randall Hauch <[email protected]>
AuthorDate: Fri Feb 19 11:49:56 2021 -0600
KAFKA-12340: Fix potential resource leak in Kafka*BackingStore (#10153)
These Kafka*BackingStore classes used in Connect have a recently-added
deprecated constructor, which is not used within AK. However, this commit
corrects a AdminClient resource leak if those deprecated constructors are used
outside of AK. The fix simply ensures that the AdminClient created by the
“default” supplier is always closed when the Kafka*BackingStore is stopped.
Author: Randall Hauch <[email protected]>
Reviewers: Konstantine Karantasis <[email protected]>, Chia-Ping
Tsai <[email protected]>
---
.../connect/storage/KafkaConfigBackingStore.java | 19 +++++++++++++++++--
.../connect/storage/KafkaOffsetBackingStore.java | 19 +++++++++++++++++--
.../connect/storage/KafkaStatusBackingStore.java | 19 +++++++++++++++++--
3 files changed, 51 insertions(+), 6 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index d4e6358..dcfc28c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,6 +227,7 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
private final Map<String, Map<String, String>> connectorConfigs = new
HashMap<>();
private final Map<ConnectorTaskId, Map<String, String>> taskConfigs = new
HashMap<>();
private final Supplier<TopicAdmin> topicAdminSupplier;
+ private SharedTopicAdmin ownTopicAdmin;
// Set of connectors where we saw a task commit with an incomplete set of
task config updates, indicating the data
// is in an inconsistent state and we cannot safely use them until they
have been refreshed.
@@ -291,7 +293,13 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
@Override
public void stop() {
log.info("Closing KafkaConfigBackingStore");
- configLog.stop();
+ try {
+ configLog.stop();
+ } finally {
+ if (ownTopicAdmin != null) {
+ ownTopicAdmin.close();
+ }
+ }
log.info("Closed KafkaConfigBackingStore");
}
@@ -479,7 +487,14 @@ public class KafkaConfigBackingStore implements
ConfigBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config,
clusterId);
- Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ?
topicAdminSupplier : () -> new TopicAdmin(adminProps);
+ Supplier<TopicAdmin> adminSupplier;
+ if (topicAdminSupplier != null) {
+ adminSupplier = topicAdminSupplier;
+ } else {
+ // Create our own topic admin supplier that we'll close when we're
stopped
+ ownTopicAdmin = new SharedTopicAdmin(adminProps);
+ adminSupplier = ownTopicAdmin;
+ }
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig)
config).configStorageTopicSettings()
: Collections.emptyMap();
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 26b47f9..313baf7 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -32,6 +32,7 @@ import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConvertingFutureCallback;
import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,6 +66,7 @@ public class KafkaOffsetBackingStore implements
OffsetBackingStore {
private KafkaBasedLog<byte[], byte[]> offsetLog;
private HashMap<ByteBuffer, ByteBuffer> data;
private final Supplier<TopicAdmin> topicAdminSupplier;
+ private SharedTopicAdmin ownTopicAdmin;
@Deprecated
public KafkaOffsetBackingStore() {
@@ -98,7 +100,14 @@ public class KafkaOffsetBackingStore implements
OffsetBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config,
clusterId);
- Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ?
topicAdminSupplier : () -> new TopicAdmin(adminProps);
+ Supplier<TopicAdmin> adminSupplier;
+ if (topicAdminSupplier != null) {
+ adminSupplier = topicAdminSupplier;
+ } else {
+ // Create our own topic admin supplier that we'll close when we're
stopped
+ ownTopicAdmin = new SharedTopicAdmin(adminProps);
+ adminSupplier = ownTopicAdmin;
+ }
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig)
config).offsetStorageTopicSettings()
: Collections.emptyMap();
@@ -140,7 +149,13 @@ public class KafkaOffsetBackingStore implements
OffsetBackingStore {
@Override
public void stop() {
log.info("Stopping KafkaOffsetBackingStore");
- offsetLog.stop();
+ try {
+ offsetLog.stop();
+ } finally {
+ if (ownTopicAdmin != null) {
+ ownTopicAdmin.close();
+ }
+ }
log.info("Stopped KafkaOffsetBackingStore");
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index abdbba8..bb73e46 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -44,6 +44,7 @@ import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.SharedTopicAdmin;
import org.apache.kafka.connect.util.Table;
import org.apache.kafka.connect.util.TopicAdmin;
import org.slf4j.Logger;
@@ -134,6 +135,7 @@ public class KafkaStatusBackingStore implements
StatusBackingStore {
private String statusTopic;
private KafkaBasedLog<String, byte[]> kafkaLog;
private int generation;
+ private SharedTopicAdmin ownTopicAdmin;
@Deprecated
public KafkaStatusBackingStore(Time time, Converter converter) {
@@ -177,7 +179,14 @@ public class KafkaStatusBackingStore implements
StatusBackingStore {
Map<String, Object> adminProps = new HashMap<>(originals);
ConnectUtils.addMetricsContextProperties(adminProps, config,
clusterId);
- Supplier<TopicAdmin> adminSupplier = topicAdminSupplier != null ?
topicAdminSupplier : () -> new TopicAdmin(adminProps);
+ Supplier<TopicAdmin> adminSupplier;
+ if (topicAdminSupplier != null) {
+ adminSupplier = topicAdminSupplier;
+ } else {
+ // Create our own topic admin supplier that we'll close when we're
stopped
+ ownTopicAdmin = new SharedTopicAdmin(adminProps);
+ adminSupplier = ownTopicAdmin;
+ }
Map<String, Object> topicSettings = config instanceof DistributedConfig
? ((DistributedConfig)
config).statusStorageTopicSettings()
@@ -226,7 +235,13 @@ public class KafkaStatusBackingStore implements
StatusBackingStore {
@Override
public void stop() {
- kafkaLog.stop();
+ try {
+ kafkaLog.stop();
+ } finally {
+ if (ownTopicAdmin != null) {
+ ownTopicAdmin.close();
+ }
+ }
}
@Override