Terry Beard created KAFKA-14565:
-----------------------------------
Summary: Add A No Implementation Default Open Method To Consumer
and Producer Interceptor Interfaces
Key: KAFKA-14565
URL: https://issues.apache.org/jira/browse/KAFKA-14565
Project: Kafka
Issue Type: Improvement
Components: clients
Reporter: Terry Beard
Assignee: Terry Beard
h2. PROBLEM
The Consumer and Producer interceptor interfaces and their corresponding Kafka
Consumer and Producer constructors do not adequately support cleanup of
underlying interceptor resources. [More
colors|https://issues.apache.org/jira/secure/CreateIssue.jspa#]
Currently within the Kafka Consumer and Kafka Producer constructors, the
AbstractConfig.getConfiguredInstances() is delagated responsibilty for both
creating and configuring each interceptor listed in the interceptor.classes
property and returns a configured List<ConsumerInterceptor<K,V>> interceptors.
h2. Kafka Consumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List)
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
{code}
h2. Kafka Producer Constructor
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List)
config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
{code}
This dual responsibility for both creation and configuration is problematic
when it involves multiple interceptors where at least one interceptor's
configure method implementation creates and/or depends on objects which creates
threads, connections or other resources which requires clean up and the
subsequent interceptor's configure method raises a runtime exception. This
raising of the runtime exception results produces a resource leakage in the
first interceptor as the interceptor container i.e.
ConsumerInterceptors/ProducerInterceptors are never created and therefore the
first interceptor's and really any interceptor's close method are never called.
h2. KafkaConsumer Constructor
{code:java}
try {
....
List<ConsumerInterceptor<K, V>> interceptorList = (List)
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId));
.... {code}
If the above line results in a runtime exception, the below this.interceptors
is never created.
{code:java}
this.interceptors = new ConsumerInterceptors<>(interceptorList); {code}
h2. Kafka Producer{color:#172b4d} Constructor{color}
{code:java}
try {
....
List<ProducerInterceptor<K, V>> interceptorList = (List)
config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
{code}
If the above line results in a runtime exception, the below this.interceptors
is never created.
{code:java}
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
.... {code}
Although, both Kafka Consumer and Kafka Producer constructors try/catch
implement close for resource clean up,
{code:java}
...
catch (Throwable t) {
// call close methods if internal objects are already constructed; this is
to prevent resource leak. see KAFKA-2121
// we do not need to call `close` at all when `log` is null, which means no
internal objects were initialized.
if (this.log != null) {
close(0, true);
}
// now propagate the exception
throw new KafkaException("Failed to construct kafka consumer", t);
} {code}
their respective close implementation located in the catch above never calls
the respective container interceptor close method below as the
{color:#172b4d}*{color:#ffab00}this{color}.{color:#403294}interceptors{color}*{color}{color:#403294}
{color}was never created.
{code:java}
private void close(long timeoutMs, boolean swallowException) {
....
Utils.closeQuietly(interceptors, "consumer interceptors", firstException);
.... {code}
This problem is magnified within a webserver cluster i.e. Confluent's REST
Proxy server where thousands of requests containing interceptor configuration
failures can occur in seconds resulting in an inadvertent DDoS attack as
cluster resources are quickly exhausted, disrupting all service activities.
h2. PROPOSAL
To help ensure the respective container interceptors are able to invoke their
respective interceptor close methods for proper resource clean up, I propose
defining a default open method with no implementation and a check exception on
the respective Consumer/Producer interceptor interfaces. This open method will
be responsible for creating threads and/or objects which utilizes threads,
connections or other resource which requires clean up. Additionally, the
default open method enables implementation optionality as it's empty default
behavior means it will do nothing on unimplemented classes of this interceptor
interface.
{code:java}
package org.apache.kafka.clients.consumer;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public interface ConsumerInterceptor<K, V> extends Configurable, AutoCloseable
{
ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
default void open() throws Exception {};
void close();
}
{code}
{code:java}
package org.apache.kafka.clients.producer;
import org.apache.kafka.common.Configurable;
public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
void onAcknowledgement(RecordMetadata metadata, Exception exception);
default void open() throws Exception {};
void close();
}
{code}
{color:#172b4d}Additionally, the Kafka Consumer/Producer Interceptor containers
will implement a corresponding maybeOpen method which throws a checked
Exception. It's called maybeOpen for backwards compatibility purpose as it
must determine whether an interceptor's interface contains the newer open
method before calling it accordingly. {color}
{color:#172b4d}{*}NOTE{*}: Developers are encouraged to throw a more specific
exception.{color}
{code:java}
package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Closeable;
import java.util.List;
import java.util.Arrays;
import java.util.Map;/**
public class ConsumerInterceptors<K, V> implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(ConsumerInterceptors.class);
private final List<ConsumerInterceptor<K, V>> interceptors;
public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
this.interceptors = interceptors;
}
public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
ConsumerRecords<K, V> interceptRecords = records;
for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
try {
interceptRecords = interceptor.onConsume(interceptRecords);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue
calling other interceptors
log.warn("Error executing interceptor onConsume callback", e);
}
}
return interceptRecords;
}
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
try {
interceptor.onCommit(offsets);
} catch (Exception e) {
// do not propagate interceptor exception, just log
log.warn("Error executing interceptor onCommit callback", e);
}
}
}
@Override
public void close() {
for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
try {
interceptor.close();
} catch (Exception e) {
log.error("Failed to close consumer interceptor ", e);
}
}
} public List<ConsumerInterceptor<K, V>> getInterceptors() {
return interceptors;
}
/**
* Only interceptors which implement {@link ConsumerInterceptor#open()} are
called by the container. This is for backwards
* compatibility as older interceptors do not contain the default open()
* */
public void maybeOpen() throws Exception {
for (ConsumerInterceptor<K, V> interceptor : this.getInterceptors()) {
try {
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method ->
method.getName() == "open")){
interceptor.open();
}
} catch (Exception e) {
log.error("Failed to open consumer interceptor ", e);
throw e;
}
}
}
} {code}
{code:java}
package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.Closeable;
import java.util.Arrays;
import java.util.List;/**
public class ProducerInterceptors<K, V> implements Closeable {
private static final Logger log =
LoggerFactory.getLogger(ProducerInterceptors.class);
private final List<ProducerInterceptor<K, V>> interceptors;
public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
this.interceptors = interceptors;
}
public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
ProducerRecord<K, V> interceptRecord = record;
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptRecord = interceptor.onSend(interceptRecord);
} catch (Exception e) {
// do not propagate interceptor exception, log and continue
calling other interceptors
// be careful not to throw exception from here
if (record != null)
log.warn("Error executing interceptor onSend callback for
topic: {}, partition: {}", record.topic(), record.partition(), e);
else
log.warn("Error executing interceptor onSend callback", e);
}
}
return interceptRecord;
}
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptor.onAcknowledgement(metadata, exception);
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
log.warn("Error executing interceptor onAcknowledgement
callback", e);
}
}
}
public void onSendError(ProducerRecord<K, V> record, TopicPartition
interceptTopicPartition, Exception exception) {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
if (record == null && interceptTopicPartition == null) {
interceptor.onAcknowledgement(null, exception);
} else {
if (interceptTopicPartition == null) {
interceptTopicPartition = extractTopicPartition(record);
}
interceptor.onAcknowledgement(new
RecordMetadata(interceptTopicPartition, -1, -1,
RecordBatch.NO_TIMESTAMP, -1, -1),
exception);
}
} catch (Exception e) {
// do not propagate interceptor exceptions, just log
log.warn("Error executing interceptor onAcknowledgement
callback", e);
}
}
}
public static <K, V> TopicPartition extractTopicPartition(ProducerRecord<K,
V> record) {
return new TopicPartition(record.topic(), record.partition() == null ?
RecordMetadata.UNKNOWN_PARTITION : record.partition());
}
@Override
public void close() {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
interceptor.close();
} catch (Exception e) {
log.error("Failed to close producer interceptor ", e);
}
}
}
/**
* Only interceptors which implement {@link ProducerInterceptor#open()} are
called by the container. This is for backwards
* compatibility as older interceptors do not contain the default open()
* */
public void maybeOpen() throws Exception {
for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
try {
if(Arrays.stream(interceptor.getClass().getMethods()).anyMatch(method ->
method.getName() == "open")){
interceptor.open();
}
} catch (Exception e) {
log.error("Failed to open producer interceptor ", e);
throw e;
}
}
}
}
{code}
In summary, the overall workflow is that after the configured interceptor
instances are returned by the AbstractConfig.getConfiguredInstances(), the
Kafka Consumer/Producer constructor's respective interceptor container
maybeOpen method will be called.
If in the maybeOpen call, an exception occurs following the interceptor open
method call, the respective client constructor's try/catch will call the
interceptor container's close method which in-turn loops through and calls each
interceptor's close method for clean up of resources allocated in the
interceptor open method.
If an exception occurs in the configure method all objects will be garbage
collected as this method must no longer be used for creating threads and/or
objects which utilizes threads, connections or other resources which requires
clean up.
h2. Kafka Consumer Constructor maybeOpen example
{code:java}
...
List<ConsumerInterceptor<K, V>> interceptorList = (List)
config.getConfiguredInstances(
ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class,
Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
clientId)); this.interceptors = new
ConsumerInterceptors<>(interceptorList);
this.interceptors.maybeOpen();
...{code}
h2. Kafka Producer {color:#172b4d}maybeOpen{color} example
{code:java}
...
List<ProducerInterceptor<K, V>> interceptorList = (List)
config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG,
clientId));
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
this.interceptors.maybeOpen();
...{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)