Repository: kafka
Updated Branches:
  refs/heads/trunk fe3b7492b -> d5b43b19b


KAFKA-3162: Added producer and consumer interceptors

This is the most of the KIP-42: Producer and consumer interceptor. (Except 
exposing CRC and record sizes to the interceptor, which is coming as a separate 
PR; tracked by KAFKA-3196).

This PR includes:
1. Add ProducerInterceptor interface and call its callbacks from appropriate 
places in Kafka Producer.
2. Add ConsumerInterceptor interface and call its callbacks from appropriate 
places in Kafka Consumer.
3. Add unit tests for interceptor changes
4. Add integration test for both mutable consumer and producer interceptors.

Author: Anna Povzner <[email protected]>

Reviewers: Jason Gustavson, Ismael Juma, Gwen Shapira

Closes #854 from apovzner/kip42


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d5b43b19
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d5b43b19
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d5b43b19

Branch: refs/heads/trunk
Commit: d5b43b19bb06e9cdc606312c8bcf87ed267daf44
Parents: fe3b749
Author: Anna Povzner <[email protected]>
Authored: Tue Feb 9 22:10:06 2016 -0700
Committer: Gwen Shapira <[email protected]>
Committed: Tue Feb 9 22:10:06 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/ConsumerConfig.java  |  10 ++
 .../clients/consumer/ConsumerInterceptor.java   |  72 ++++++++
 .../kafka/clients/consumer/KafkaConsumer.java   |  15 +-
 .../consumer/internals/ConsumerCoordinator.java |  12 +-
 .../internals/ConsumerInterceptors.java         |  99 +++++++++++
 .../kafka/clients/producer/KafkaProducer.java   |  54 ++++++
 .../kafka/clients/producer/ProducerConfig.java  |  11 ++
 .../clients/producer/ProducerInterceptor.java   |  88 ++++++++++
 .../internals/ProducerInterceptors.java         | 106 +++++++++++
 .../kafka/common/config/AbstractConfig.java     |  10 ++
 .../clients/consumer/KafkaConsumerTest.java     |  24 +++
 .../internals/ConsumerCoordinatorTest.java      |   3 +-
 .../internals/ConsumerInterceptorsTest.java     | 174 +++++++++++++++++++
 .../clients/producer/KafkaProducerTest.java     |  25 +++
 .../internals/ProducerInterceptorsTest.java     | 147 ++++++++++++++++
 .../kafka/test/MockConsumerInterceptor.java     |  80 +++++++++
 .../kafka/test/MockProducerInterceptor.java     |  85 +++++++++
 .../kafka/api/BaseConsumerTest.scala            |  29 ++--
 .../kafka/api/PlaintextConsumerTest.scala       | 157 ++++++++++++++++-
 19 files changed, 1182 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index df192b9..3132cae 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -162,6 +162,11 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String REQUEST_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
     private static final String REQUEST_TIMEOUT_MS_DOC = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
 
+    /** <code>interceptor.classes</code> */
+    public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
+    public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to 
use as interceptors. "
+                                                        + "Implementing the 
<code>ConsumerInterceptor</code> interface allows you to intercept (and 
possibly mutate) records "
+                                                        + "received by the 
consumer. By default, there are no interceptors.";
 
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
@@ -296,6 +301,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         9 * 60 * 1000,
                                         Importance.MEDIUM,
                                         
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+                                .define(INTERCEPTOR_CLASSES_CONFIG,
+                                        Type.LIST,
+                                        null,
+                                        Importance.LOW,
+                                        INTERCEPTOR_CLASSES_DOC)
 
                                 // security support
                                 
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
new file mode 100644
index 0000000..5c13a41
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer;
+
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Map;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) 
records received by the consumer. A primary use-case
+ * is for third-party components to hook into the consumer applications for 
custom monitoring, logging, etc.
+ *
+ * <p>
+ * This class will get consumer config properties via <code>configure()</code> 
method, including clientId assigned
+ * by KafkaConsumer if not specified in the consumer config. The interceptor 
implementation needs to be aware that it will be
+ * sharing consumer config namespace with other interceptors and serializers, 
and ensure that there are no conflicts.
+ * <p>
+ * Exceptions thrown by ConsumerInterceptor methods will be caught, logged, 
but not propagated further. As a result, if
+ * the user configures the interceptor with the wrong key and value type 
parameters, the consumer will not throw an exception,
+ * just log the errors.
+ * <p>
+ * ConsumerInterceptor callbacks are called from the same thread that invokes 
{@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}.
+ */
+public interface ConsumerInterceptor<K, V> extends Configurable {
+
+    /**
+     * This is called just before the records are returned by {@link 
org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}
+     * <p>
+     * This method is allowed to modify consumer records, in which case the 
new records will be returned.
+     * Any exception thrown by this method will be caught by the caller, 
logged, but not propagated to the client.
+     * <p>
+     * Since the consumer may run multiple interceptors, a particular 
interceptor's onConsume() callback will be called
+     * in the order specified by {@link 
org.apache.kafka.clients.consumer.ConsumerConfig#INTERCEPTOR_CLASSES_CONFIG}.
+     * The first interceptor in the list gets the consumed records, the 
following interceptor will be passed the records returned
+     * by the previous interceptor, and so on. Since interceptors are allowed 
to modify records, interceptors may potentially get
+     * the records already modified by other interceptors. However, building a 
pipeline of mutable interceptors that depend on the output
+     * of the previous interceptor is discouraged, because of potential 
side-effects caused by interceptors potentially failing
+     * to modify the record and throwing an exception. If one of the 
interceptors in the list throws an exception from onConsume(),
+     * the exception is caught, logged, and the next interceptor is called 
with the records returned by the last successful interceptor
+     * in the list, or otherwise the original consumed records.
+     *
+     * @param records records to be consumed by the client or records returned 
by the previous interceptors in the list.
+     * @return records that are either modified by the interceptor or same as 
records passed to this method.
+     */
+    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records);
+
+    /**
+     * This is called when offsets get committed.
+     * <p>
+     * Any exception thrown by this method will be ignored by the caller.
+     *
+     * @param offsets A map of offsets by partition with associated metadata
+     */
+    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets);
+
+    /**
+     * This is called when interceptor is closed
+     */
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c14cc68..faa9a78 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.Fetcher;
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
+import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -464,6 +465,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
     private final Fetcher<K, V> fetcher;
+    private final ConsumerInterceptors<K, V> interceptors;
 
     private final Time time;
     private final ConsumerNetworkClient client;
@@ -589,6 +591,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
             List<PartitionAssignor> assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
+            // load interceptors and make sure they get clientId
+            Map<String, Object> userProvidedConfigs = config.originals();
+            userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+            List<ConsumerInterceptor<K, V>> interceptorList = (List) (new 
ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    ConsumerInterceptor.class);
+            this.interceptors = interceptorList.isEmpty() ? null : new 
ConsumerInterceptors<>(interceptorList);
             this.coordinator = new ConsumerCoordinator(this.client,
                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
                     config.getInt(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG),
@@ -602,7 +610,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     retryBackoffMs,
                     new ConsumerCoordinator.DefaultOffsetCommitCallback(),
                     
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
-                    
config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG));
+                    
config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
+                    this.interceptors);
             if (keyDeserializer == null) {
                 this.keyDeserializer = 
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                         Deserializer.class);
@@ -860,7 +869,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     // auto-committing offsets
                     fetcher.initFetches(metadata.fetch());
                     client.quickPoll();
-                    return new ConsumerRecords<>(records);
+                    return this.interceptors == null
+                        ? new ConsumerRecords<>(records) : 
this.interceptors.onConsume(new ConsumerRecords<>(records));
                 }
 
                 long elapsed = time.milliseconds() - start;
@@ -1273,6 +1283,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         AtomicReference<Throwable> firstException = new AtomicReference<>();
         this.closed = true;
         ClientUtils.closeQuietly(coordinator, "coordinator", firstException);
+        ClientUtils.closeQuietly(interceptors, "consumer interceptors", 
firstException);
         ClientUtils.closeQuietly(metrics, "consumer metrics", firstException);
         ClientUtils.closeQuietly(client, "consumer network client", 
firstException);
         ClientUtils.closeQuietly(keyDeserializer, "consumer key deserializer", 
firstException);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 41d2a27..aa39e11 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -68,6 +68,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     private final OffsetCommitCallback defaultOffsetCommitCallback;
     private final boolean autoCommitEnabled;
     private final AutoCommitTask autoCommitTask;
+    private final ConsumerInterceptors interceptors;
 
     /**
      * Initialize the coordination manager.
@@ -85,7 +86,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                                long retryBackoffMs,
                                OffsetCommitCallback 
defaultOffsetCommitCallback,
                                boolean autoCommitEnabled,
-                               long autoCommitIntervalMs) {
+                               long autoCommitIntervalMs,
+                               ConsumerInterceptors interceptors) {
         super(client,
                 groupId,
                 sessionTimeoutMs,
@@ -107,6 +109,7 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
 
         this.autoCommitTask = autoCommitEnabled ? new 
AutoCommitTask(autoCommitIntervalMs) : null;
         this.sensors = new ConsumerCoordinatorMetrics(metrics, 
metricGrpPrefix);
+        this.interceptors = interceptors;
     }
 
     @Override
@@ -326,6 +329,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         future.addListener(new RequestFutureListener<Void>() {
             @Override
             public void onSuccess(Void value) {
+                if (interceptors != null)
+                    interceptors.onCommit(offsets);
                 cb.onComplete(offsets, null);
             }
 
@@ -354,8 +359,11 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
             RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
             client.poll(future);
 
-            if (future.succeeded())
+            if (future.succeeded()) {
+                if (interceptors != null)
+                    interceptors.onCommit(offsets);
                 return;
+            }
 
             if (!future.isRetriable())
                 throw future.exception();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
new file mode 100644
index 0000000..f22686e
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptors.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A container that holds the list {@link 
org.apache.kafka.clients.consumer.ConsumerInterceptor}
+ * and wraps calls to the chain of custom interceptors.
+ */
+public class ConsumerInterceptors<K, V> implements Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(ConsumerInterceptors.class);
+    private final List<ConsumerInterceptor<K, V>> interceptors;
+
+    public ConsumerInterceptors(List<ConsumerInterceptor<K, V>> interceptors) {
+        this.interceptors = interceptors;
+    }
+
+    /**
+     * This is called when the records are about to be returned to the user.
+     * <p>
+     * This method calls {@link 
ConsumerInterceptor#onConsume(ConsumerRecords)} for each
+     * interceptor. Records returned from each interceptor get passed to 
onConsume() of the next interceptor
+     * in the chain of interceptors.
+     * <p>
+     * This method does not throw exceptions. If any of the interceptors in 
the chain throws an exception,
+     * it gets caught and logged, and next interceptor in the chain is called 
with 'records' returned by the
+     * previous successful interceptor onConsume call.
+     *
+     * @param records records to be consumed by the client.
+     * @return records that are either modified by interceptors or same as 
records passed to this method.
+     */
+    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
+        ConsumerRecords<K, V> interceptRecords = records;
+        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptRecords = interceptor.onConsume(interceptRecords);
+            } catch (Exception e) {
+                // do not propagate interceptor exception, log and continue 
calling other interceptors
+                log.warn("Error executing interceptor onConsume callback", e);
+            }
+        }
+        return interceptRecords;
+    }
+
+    /**
+     * This is called when commit request returns successfully from the broker.
+     * <p>
+     * This method calls {@link ConsumerInterceptor#onCommit(Map)} method for 
each interceptor.
+     * <p>
+     * This method does not throw exceptions. Exceptions thrown by any of the 
interceptors in the chain are logged, but not propagated.
+     *
+     * @param offsets A map of offsets by partition with associated metadata
+     */
+    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptor.onCommit(offsets);
+            } catch (Exception e) {
+                // do not propagate interceptor exception, just log
+                log.warn("Error executing interceptor onCommit callback", e);
+            }
+        }
+    }
+
+    /**
+     * Closes every interceptor in a container.
+     */
+    @Override
+    public void close() {
+        for (ConsumerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptor.close();
+            } catch (Exception e) {
+                log.error("Failed to close consumer interceptor ", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 3fc2a19..a76dc1a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
+import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -60,6 +61,7 @@ import org.apache.kafka.common.utils.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 /**
  * A Kafka client that publishes records to the Kafka cluster.
  * <P>
@@ -147,6 +149,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final ProducerConfig producerConfig;
     private final long maxBlockTimeMs;
     private final int requestTimeoutMs;
+    private final ProducerInterceptors<K, V> interceptors;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as 
configuration. Valid configuration strings
@@ -313,6 +316,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
                 config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
                 this.valueSerializer = valueSerializer;
             }
+
+            // load interceptors and make sure they get clientId
+            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+            List<ProducerInterceptor<K, V>> interceptorList = (List) (new 
ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    ProducerInterceptor.class);
+            this.interceptors = interceptorList.isEmpty() ? null : new 
ProducerInterceptors<>(interceptorList);
+
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
             log.debug("Kafka producer started");
@@ -410,6 +420,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
      */
     @Override
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
+        // intercept the record, which can be potentially modified; this 
method does not throw exceptions
+        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? 
record : this.interceptors.onSend(record);
+        // producer callback will make sure to call both 'callback' and 
interceptor callback
+        Callback interceptCallback = this.interceptors == null ? callback : 
new InterceptorCallback<>(callback, this.interceptors);
+        return doSend(interceptedRecord, interceptCallback);
+    }
+
+    /**
+     * Implementation of asynchronously send a record to a topic. Equivalent 
to <code>send(record, null)</code>.
+     * See {@link #send(ProducerRecord, Callback)} for details.
+     */
+    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, 
Callback callback) {
         try {
             // first make sure the metadata for the topic is available
             long waitedOnMetadataMs = waitOnMetadata(record.topic(), 
this.maxBlockTimeMs);
@@ -452,13 +474,24 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
             return new FutureFailure(e);
         } catch (InterruptedException e) {
             this.errors.record();
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(null, e);
             throw new InterruptException(e);
         } catch (BufferExhaustedException e) {
             this.errors.record();
             this.metrics.sensor("buffer-exhausted-records").record();
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(null, e);
             throw e;
         } catch (KafkaException e) {
             this.errors.record();
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(null, e);
+            throw e;
+        } catch (Exception e) {
+            // we notify interceptor about all exceptions, since onSend is 
called before anything else in this method
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(null, e);
             throw e;
         }
     }
@@ -650,6 +683,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
         }
 
+        ClientUtils.closeQuietly(interceptors, "producer interceptors", 
firstException);
         ClientUtils.closeQuietly(metrics, "producer metrics", firstException);
         ClientUtils.closeQuietly(keySerializer, "producer keySerializer", 
firstException);
         ClientUtils.closeQuietly(valueSerializer, "producer valueSerializer", 
firstException);
@@ -716,4 +750,24 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
 
     }
 
+    /**
+     * A callback called when producer request is complete. It in turn calls 
user-supplied callback (if given) and
+     * notifies producer interceptors about the request completion.
+     */
+    private static class InterceptorCallback<K, V> implements Callback {
+        private final Callback userCallback;
+        private final ProducerInterceptors<K, V> interceptors;
+
+        public InterceptorCallback(Callback userCallback, 
ProducerInterceptors<K, V> interceptors) {
+            this.userCallback = userCallback;
+            this.interceptors = interceptors;
+        }
+
+        public void onCompletion(RecordMetadata metadata, Exception exception) 
{
+            if (this.interceptors != null)
+                this.interceptors.onAcknowledgement(metadata, exception);
+            if (this.userCallback != null)
+                this.userCallback.onCompletion(metadata, exception);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index ae9aa08..ee2b142 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -200,6 +200,12 @@ public class ProducerConfig extends AbstractConfig {
     public static final String REQUEST_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
     private static final String REQUEST_TIMEOUT_MS_DOC = 
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
 
+    /** <code>interceptor.classes</code> */
+    public static final String INTERCEPTOR_CLASSES_CONFIG = 
"interceptor.classes";
+    public static final String INTERCEPTOR_CLASSES_DOC = "A list of classes to 
use as interceptors. "
+                                                        + "Implementing the 
<code>ProducerInterceptor</code> interface allows you to intercept (and 
possibly mutate) the records "
+                                                        + "received by the 
producer before they are published to the Kafka cluster. By default, there are 
no interceptors.";
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
Importance.HIGH, CommonClientConfigs.BOOSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 
1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -277,6 +283,11 @@ public class ProducerConfig extends AbstractConfig {
                                         Type.CLASS,
                                         DefaultPartitioner.class.getName(),
                                         Importance.MEDIUM, 
PARTITIONER_CLASS_DOC)
+                                .define(INTERCEPTOR_CLASSES_CONFIG,
+                                        Type.LIST,
+                                        null,
+                                        Importance.LOW,
+                                        INTERCEPTOR_CLASSES_DOC)
 
                                 // security support
                                 
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
new file mode 100644
index 0000000..aa18fdc
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.Configurable;
+
+/**
+ * A plugin interface that allows you to intercept (and possibly mutate) the 
records received by the producer before
+ * they are published to the Kafka cluster.
+ * <p>
+ * This class will get producer config properties via <code>configure()</code> 
method, including clientId assigned
+ * by KafkaProducer if not specified in the producer config. The interceptor 
implementation needs to be aware that it will be
+ * sharing producer config namespace with other interceptors and serializers, 
and ensure that there are no conflicts.
+ * <p>
+ * Exceptions thrown by ProducerInterceptor methods will be caught, logged, 
but not propagated further. As a result, if
+ * the user configures the interceptor with the wrong key and value type 
parameters, the producer will not throw an exception,
+ * just log the errors.
+ * <p>
+ * ProducerInterceptor callbacks may be called from multiple threads. 
Interceptor implementation must ensure thread-safety, if needed.
+ */
+public interface ProducerInterceptor<K, V> extends Configurable {
+    /**
+     * This is called from {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} and
+     * {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord, Callback)} 
methods, before key and value
+     * get serialized and partition is assigned (if partition is not specified 
in ProducerRecord).
+     * <p>
+     * This method is allowed to modify the record, in which case, the new 
record will be returned. The implication of modifying
+     * key/value is that partition assignment (if not specified in 
ProducerRecord) will be done based on modified key/value,
+     * not key/value from the client. Consequently, key and value 
transformation done in onSend() needs to be consistent:
+     * same key and value should mutate to the same (modified) key and value. 
Otherwise, log compaction would not work
+     * as expected.
+     * <p>
+     * Similarly, it is up to interceptor implementation to ensure that 
correct topic/partition is returned in ProducerRecord.
+     * Most often, it should be the same topic/partition from 'record'.
+     * <p>
+     * Any exception thrown by this method will be caught by the caller and 
logged, but not propagated further.
+     * <p>
+     * Since the producer may run multiple interceptors, a particular 
interceptor's onSend() callback will be called in the order
+     * specified by {@link 
org.apache.kafka.clients.producer.ProducerConfig#INTERCEPTOR_CLASSES_CONFIG}. 
The first interceptor
+     * in the list gets the record passed from the client, the following 
interceptor will be passed the record returned by the
+     * previous interceptor, and so on. Since interceptors are allowed to 
modify records, interceptors may potentially get
+     * the record already modified by other interceptors. However, building a 
pipeline of mutable interceptors that depend on the output
+     * of the previous interceptor is discouraged, because of potential 
side-effects caused by interceptors potentially failing to
+     * modify the record and throwing an exception. If one of the interceptors 
in the list throws an exception from onSend(), the exception
+     * is caught, logged, and the next interceptor is called with the record 
returned by the last successful interceptor in the list,
+     * or otherwise the client.
+     *
+     * @param record the record from client or the record returned by the 
previous interceptor in the chain of interceptors.
+     * @return producer record to send to topic/partition
+     */
+    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record);
+
+    /**
+     * This method is called when the record sent to the server has been 
acknowledged, or when sending the record fails before
+     * it gets sent to the server.
+     * <p>
+     * This method is generally called just before the user callback is 
called, and in additional cases when <code>KafkaProducer.send()</code>
+     * throws an exception.
+     * <p>
+     * Any exception thrown by this method will be ignored by the caller.
+     * <p>
+     * This method will generally execute in the background I/O thread, so the 
implementation should be reasonably fast.
+     * Otherwise, sending of messages from other threads could be delayed.
+     *
+     * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset). Null if an error occurred.
+     * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
+     */
+    public void onAcknowledgement(RecordMetadata metadata, Exception 
exception);
+
+    /**
+     * This is called when interceptor is closed
+     */
+    public void close();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
new file mode 100644
index 0000000..9343a2e
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.List;
+
+/**
+ * A container that holds the list {@link 
org.apache.kafka.clients.producer.ProducerInterceptor}
+ * and wraps calls to the chain of custom interceptors.
+ */
+public class ProducerInterceptors<K, V> implements Closeable {
+    private static final Logger log = 
LoggerFactory.getLogger(ProducerInterceptors.class);
+    private final List<ProducerInterceptor<K, V>> interceptors;
+
+    public ProducerInterceptors(List<ProducerInterceptor<K, V>> interceptors) {
+        this.interceptors = interceptors;
+    }
+
+    /**
+     * This is called when client sends the record to KafkaProducer, before 
key and value gets serialized.
+     * The method calls {@link ProducerInterceptor#onSend(ProducerRecord)} 
method. ProducerRecord
+     * returned from the first interceptor's onSend() is passed to the second 
interceptor onSend(), and so on in the
+     * interceptor chain. The record returned from the last interceptor is 
returned from this method.
+     *
+     * This method does not throw exceptions. Exceptions thrown by any of 
interceptor methods are caught and ignored.
+     * If an interceptor in the middle of the chain, that normally modifies 
the record, throws an exception,
+     * the next interceptor in the chain will be called with a record returned 
by the previous interceptor that did not
+     * throw an exception.
+     *
+     * @param record the record from client
+     * @return producer record to send to topic/partition
+     */
+    public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
+        ProducerRecord<K, V> interceptRecord = record;
+        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptRecord = interceptor.onSend(interceptRecord);
+            } catch (Exception e) {
+                // do not propagate interceptor exception, log and continue 
calling other interceptors
+                // be careful not to throw exception from here
+                if (record != null)
+                    log.warn("Error executing interceptor onSend callback for 
topic: {}, partition: {}", record.topic(), record.partition(), e);
+                else
+                    log.warn("Error executing interceptor onSend callback", e);
+            }
+        }
+        return interceptRecord;
+    }
+
+    /**
+     * This method is called when the record sent to the server has been 
acknowledged, or when sending the record fails before
+     * it gets sent to the server. This method calls {@link 
ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
+     * method for each interceptor.
+     *
+     * This method does not throw exceptions. Exceptions thrown by any of 
interceptor methods are caught and ignored.
+     *
+     * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset). Null if an error occurred.
+     * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
+     */
+    public void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {
+        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptor.onAcknowledgement(metadata, exception);
+            } catch (Exception e) {
+                // do not propagate interceptor exceptions, just log
+                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
+            }
+        }
+    }
+
+    /**
+     * Closes every interceptor in a container.
+     */
+    @Override
+    public void close() {
+        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                interceptor.close();
+            } catch (Exception e) {
+                log.error("Failed to close producer interceptor ", e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java 
b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index f9b6cdf..b44f72c 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -199,9 +199,19 @@ public class AbstractConfig {
         return t.cast(o);
     }
 
+    /**
+     * Get a list of configured instances of the given class specified by the 
given configuration key. The configuration
+     * may specify either null or an empty string to indicate no configured 
instances. In both cases, this method
+     * returns an empty list to indicate no configured instances.
+     * @param key The configuration key for the class
+     * @param t The interface the class should implement
+     * @return The list of configured instances
+     */
     public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
         List<String> klasses = getList(key);
         List<T> objects = new ArrayList<T>();
+        if (klasses == null)
+            return objects;
         for (String klass : klasses) {
             Object o;
             try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 5711852..c65fd73 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.consumer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
 import org.junit.Assert;
 import org.junit.Test;
@@ -92,4 +94,26 @@ public class KafkaConsumerTest {
         consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
         consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
     }
+
+    @Test
+    public void testInterceptorConstructorClose() throws Exception {
+        try {
+            Properties props = new Properties();
+            // test with client ID assigned by KafkaConsumer
+            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockConsumerInterceptor.class.getName());
+
+            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, 
String>(
+                    props, new StringDeserializer(), new StringDeserializer());
+            Assert.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
+            Assert.assertEquals(0, MockConsumerInterceptor.CLOSE_COUNT.get());
+
+            consumer.close();
+            Assert.assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
+            Assert.assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
+        } finally {
+            // cleanup since we are using mutable static variables in 
MockConsumerInterceptor
+            MockConsumerInterceptor.resetCounters();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3ae1a36..0b8a162 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -913,7 +913,8 @@ public class ConsumerCoordinatorTest {
                 retryBackoffMs,
                 defaultOffsetCommitCallback,
                 autoCommitEnabled,
-                autoCommitIntervalMs);
+                autoCommitIntervalMs,
+                null);
     }
 
     private Struct consumerMetadataResponse(Node node, short error) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
new file mode 100644
index 0000000..45210a8
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerInterceptorsTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ConsumerInterceptorsTest {
+    private final int filterPartition1 = 5;
+    private final int filterPartition2 = 6;
+    private final String topic = "test";
+    private final int partition = 1;
+    private final TopicPartition tp = new TopicPartition(topic, partition);
+    private final TopicPartition filterTopicPart1 = new 
TopicPartition("test5", filterPartition1);
+    private final TopicPartition filterTopicPart2 = new 
TopicPartition("test6", filterPartition2);
+    private final ConsumerRecord<Integer, Integer> consumerRecord = new 
ConsumerRecord<>(topic, partition, 0, 1, 1);
+    private int onCommitCount = 0;
+    private int onConsumeCount = 0;
+
+    /**
+     * Test consumer interceptor that filters records in onConsume() intercept
+     */
+    private class FilterConsumerInterceptor<K, V> implements 
ConsumerInterceptor<K, V> {
+        private int filterPartition;
+        private boolean throwExceptionOnConsume = false;
+        private boolean throwExceptionOnCommit = false;
+
+        FilterConsumerInterceptor(int filterPartition) {
+            this.filterPartition = filterPartition;
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+
+        @Override
+        public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
+            onConsumeCount++;
+            if (throwExceptionOnConsume)
+                throw new KafkaException("Injected exception in 
FilterConsumerInterceptor.onConsume.");
+
+            // filters out topic/partitions with partition == FILTER_PARTITION
+            Map<TopicPartition, List<ConsumerRecord<K, V>>> recordMap = new 
HashMap<>();
+            for (TopicPartition tp : records.partitions()) {
+                if (tp.partition() != filterPartition)
+                    recordMap.put(tp, records.records(tp));
+            }
+            return new ConsumerRecords<K, V>(recordMap);
+        }
+
+        @Override
+        public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
+            onCommitCount++;
+            if (throwExceptionOnCommit)
+                throw new KafkaException("Injected exception in 
FilterConsumerInterceptor.onCommit.");
+        }
+
+        @Override
+        public void close() {
+        }
+
+        // if 'on' is true, onConsume will always throw an exception
+        public void injectOnConsumeError(boolean on) {
+            throwExceptionOnConsume = on;
+        }
+
+        // if 'on' is true, onConsume will always throw an exception
+        public void injectOnCommitError(boolean on) {
+            throwExceptionOnCommit = on;
+        }
+    }
+
+    @Test
+    public void testOnConsumeChain() {
+        List<ConsumerInterceptor<Integer, Integer>>  interceptorList = new 
ArrayList<>();
+        // we are testing two different interceptors by configuring the same 
interceptor differently, which is not
+        // how it would be done in KafkaConsumer, but ok for testing 
interceptor callbacks
+        FilterConsumerInterceptor<Integer, Integer> interceptor1 = new 
FilterConsumerInterceptor<>(filterPartition1);
+        FilterConsumerInterceptor<Integer, Integer> interceptor2 = new 
FilterConsumerInterceptor<>(filterPartition2);
+        interceptorList.add(interceptor1);
+        interceptorList.add(interceptor2);
+        ConsumerInterceptors<Integer, Integer> interceptors = new 
ConsumerInterceptors<>(interceptorList);
+
+        // verify that onConsumer modifies ConsumerRecords
+        Map<TopicPartition, List<ConsumerRecord<Integer, Integer>>> records = 
new HashMap<>();
+        List<ConsumerRecord<Integer, Integer>> list1 = new ArrayList<>();
+        list1.add(consumerRecord);
+        List<ConsumerRecord<Integer, Integer>> list2 = new ArrayList<>();
+        list2.add(new ConsumerRecord<>(filterTopicPart1.topic(), 
filterTopicPart1.partition(), 0, 1, 1));
+        List<ConsumerRecord<Integer, Integer>> list3 = new ArrayList<>();
+        list3.add(new ConsumerRecord<>(filterTopicPart2.topic(), 
filterTopicPart2.partition(), 0, 1, 1));
+        records.put(tp, list1);
+        records.put(filterTopicPart1, list2);
+        records.put(filterTopicPart2, list3);
+        ConsumerRecords<Integer, Integer> consumerRecords = new 
ConsumerRecords<>(records);
+        ConsumerRecords<Integer, Integer> interceptedRecords = 
interceptors.onConsume(consumerRecords);
+        assertEquals(1, interceptedRecords.count());
+        assertTrue(interceptedRecords.partitions().contains(tp));
+        
assertFalse(interceptedRecords.partitions().contains(filterTopicPart1));
+        
assertFalse(interceptedRecords.partitions().contains(filterTopicPart2));
+        assertEquals(2, onConsumeCount);
+
+        // verify that even if one of the intermediate interceptors throws an 
exception, all interceptors' onConsume are called
+        interceptor1.injectOnConsumeError(true);
+        ConsumerRecords<Integer, Integer> partInterceptedRecs = 
interceptors.onConsume(consumerRecords);
+        assertEquals(2, partInterceptedRecs.count());
+        
assertTrue(partInterceptedRecs.partitions().contains(filterTopicPart1));  // 
since interceptor1 threw exception
+        
assertFalse(partInterceptedRecs.partitions().contains(filterTopicPart2)); // 
interceptor2 should still be called
+        assertEquals(4, onConsumeCount);
+
+        // if all interceptors throw an exception, records should be unmodified
+        interceptor2.injectOnConsumeError(true);
+        ConsumerRecords<Integer, Integer> noneInterceptedRecs = 
interceptors.onConsume(consumerRecords);
+        assertEquals(noneInterceptedRecs, consumerRecords);
+        assertEquals(3, noneInterceptedRecs.count());
+        assertEquals(6, onConsumeCount);
+
+        interceptors.close();
+    }
+
+    @Test
+    public void testOnCommitChain() {
+        List<ConsumerInterceptor<Integer, Integer>> interceptorList = new 
ArrayList<>();
+        // we are testing two different interceptors by configuring the same 
interceptor differently, which is not
+        // how it would be done in KafkaConsumer, but ok for testing 
interceptor callbacks
+        FilterConsumerInterceptor<Integer, Integer> interceptor1 = new 
FilterConsumerInterceptor<>(filterPartition1);
+        FilterConsumerInterceptor<Integer, Integer> interceptor2 = new 
FilterConsumerInterceptor<>(filterPartition2);
+        interceptorList.add(interceptor1);
+        interceptorList.add(interceptor2);
+        ConsumerInterceptors<Integer, Integer> interceptors = new 
ConsumerInterceptors<>(interceptorList);
+
+        // verify that onCommit is called for all interceptors in the chain
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(tp, new OffsetAndMetadata(0));
+        interceptors.onCommit(offsets);
+        assertEquals(2, onCommitCount);
+
+        // verify that even if one of the interceptors throws an exception, 
all interceptors' onCommit are called
+        interceptor1.injectOnCommitError(true);
+        interceptors.onCommit(offsets);
+        assertEquals(4, onCommitCount);
+
+        interceptors.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 1130225..2dada8c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
 import org.junit.Assert;
 import org.junit.Test;
@@ -70,4 +72,27 @@ public class KafkaProducerTest {
         Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
         Assert.assertEquals(oldCloseCount + 2, 
MockSerializer.CLOSE_COUNT.get());
     }
+
+    @Test
+    public void testInterceptorConstructClose() throws Exception {
+        try {
+            Properties props = new Properties();
+            // test with client ID assigned by KafkaProducer
+            props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+            props.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
MockProducerInterceptor.class.getName());
+            props.setProperty(MockProducerInterceptor.APPEND_STRING_PROP, 
"something");
+
+            KafkaProducer<String, String> producer = new KafkaProducer<String, 
String>(
+                    props, new StringSerializer(), new StringSerializer());
+            Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
+            Assert.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
+
+            producer.close();
+            Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
+            Assert.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
+        } finally {
+            // cleanup since we are using mutable static variables in 
MockProducerInterceptor
+            MockProducerInterceptor.resetCounters();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
new file mode 100644
index 0000000..18a455f
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ProducerInterceptorsTest {
+    private final TopicPartition tp = new TopicPartition("test", 0);
+    private final ProducerRecord<Integer, String> producerRecord = new 
ProducerRecord<>("test", 0, 1, "value");
+    private int onAckCount = 0;
+    private int onSendCount = 0;
+
+    private class AppendProducerInterceptor implements 
ProducerInterceptor<Integer, String> {
+        private String appendStr = "";
+        private boolean throwExceptionOnSend = false;
+        private boolean throwExceptionOnAck = false;
+
+        public AppendProducerInterceptor(String appendStr) {
+            this.appendStr = appendStr;
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+
+        @Override
+        public ProducerRecord<Integer, String> onSend(ProducerRecord<Integer, 
String> record) {
+            onSendCount++;
+            if (throwExceptionOnSend)
+                throw new KafkaException("Injected exception in 
AppendProducerInterceptor.onSend");
+
+            ProducerRecord<Integer, String> newRecord = new ProducerRecord<>(
+                    record.topic(), record.partition(), record.key(), 
record.value().concat(appendStr));
+            return newRecord;
+        }
+
+        @Override
+        public void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {
+            onAckCount++;
+            if (throwExceptionOnAck)
+                throw new KafkaException("Injected exception in 
AppendProducerInterceptor.onAcknowledgement");
+        }
+
+        @Override
+        public void close() {
+        }
+
+        // if 'on' is true, onSend will always throw an exception
+        public void injectOnSendError(boolean on) {
+            throwExceptionOnSend = on;
+        }
+
+        // if 'on' is true, onAcknowledgement will always throw an exception
+        public void injectOnAcknowledgementError(boolean on) {
+            throwExceptionOnAck = on;
+        }
+    }
+
+    @Test
+    public void testOnSendChain() {
+        List<ProducerInterceptor<Integer, String>> interceptorList = new 
ArrayList<>();
+        // we are testing two different interceptors by configuring the same 
interceptor differently, which is not
+        // how it would be done in KafkaProducer, but ok for testing 
interceptor callbacks
+        AppendProducerInterceptor interceptor1 = new 
AppendProducerInterceptor("One");
+        AppendProducerInterceptor interceptor2 = new 
AppendProducerInterceptor("Two");
+        interceptorList.add(interceptor1);
+        interceptorList.add(interceptor2);
+        ProducerInterceptors<Integer, String> interceptors = new 
ProducerInterceptors<>(interceptorList);
+
+        // verify that onSend() mutates the record as expected
+        ProducerRecord<Integer, String> interceptedRecord = 
interceptors.onSend(producerRecord);
+        assertEquals(2, onSendCount);
+        assertEquals(producerRecord.topic(), interceptedRecord.topic());
+        assertEquals(producerRecord.partition(), 
interceptedRecord.partition());
+        assertEquals(producerRecord.key(), interceptedRecord.key());
+        assertEquals(interceptedRecord.value(), 
producerRecord.value().concat("One").concat("Two"));
+
+        // onSend() mutates the same record the same way
+        ProducerRecord<Integer, String> anotherRecord = 
interceptors.onSend(producerRecord);
+        assertEquals(4, onSendCount);
+        assertEquals(interceptedRecord, anotherRecord);
+
+        // verify that if one of the interceptors throws an exception, other 
interceptors' callbacks are still called
+        interceptor1.injectOnSendError(true);
+        ProducerRecord<Integer, String> partInterceptRecord = 
interceptors.onSend(producerRecord);
+        assertEquals(6, onSendCount);
+        assertEquals(partInterceptRecord.value(), 
producerRecord.value().concat("Two"));
+
+        // verify the record remains valid if all onSend throws an exception
+        interceptor2.injectOnSendError(true);
+        ProducerRecord<Integer, String> noInterceptRecord = 
interceptors.onSend(producerRecord);
+        assertEquals(producerRecord, noInterceptRecord);
+
+        interceptors.close();
+    }
+
+    @Test
+    public void testOnAcknowledgementChain() {
+        List<ProducerInterceptor<Integer, String>> interceptorList = new 
ArrayList<>();
+        // we are testing two different interceptors by configuring the same 
interceptor differently, which is not
+        // how it would be done in KafkaProducer, but ok for testing 
interceptor callbacks
+        AppendProducerInterceptor interceptor1 = new 
AppendProducerInterceptor("One");
+        AppendProducerInterceptor interceptor2 = new 
AppendProducerInterceptor("Two");
+        interceptorList.add(interceptor1);
+        interceptorList.add(interceptor2);
+        ProducerInterceptors<Integer, String> interceptors = new 
ProducerInterceptors<>(interceptorList);
+
+        // verify onAck is called on all interceptors
+        RecordMetadata meta = new RecordMetadata(tp, 0, 0);
+        interceptors.onAcknowledgement(meta, null);
+        assertEquals(2, onAckCount);
+
+        // verify that onAcknowledgement exceptions do not propagate
+        interceptor1.injectOnAcknowledgementError(true);
+        interceptors.onAcknowledgement(meta, null);
+        assertEquals(4, onAckCount);
+
+        interceptor2.injectOnAcknowledgementError(true);
+        interceptors.onAcknowledgement(meta, null);
+        assertEquals(6, onAckCount);
+
+        interceptors.close();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java 
b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
new file mode 100644
index 0000000..8295b54
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+
+import org.apache.kafka.clients.consumer.ConsumerInterceptor;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockConsumerInterceptor implements ConsumerInterceptor<String, 
String> {
+    public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ON_COMMIT_COUNT = new AtomicInteger(0);
+
+    public MockConsumerInterceptor() {
+        INIT_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        // clientId must be in configs
+        Object clientIdValue = configs.get(ConsumerConfig.CLIENT_ID_CONFIG);
+        if (clientIdValue == null)
+            throw new ConfigException("Mock consumer interceptor expects 
configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+    }
+
+    @Override
+    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, 
String> records) {
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = 
new HashMap<>();
+        for (TopicPartition tp : records.partitions()) {
+            List<ConsumerRecord<String, String>> lst = new ArrayList<>();
+            for (ConsumerRecord<String, String> record: records.records(tp)) {
+                lst.add(new ConsumerRecord<>(record.topic(), 
record.partition(), record.offset(), record.key(), 
record.value().toUpperCase()));
+            }
+            recordMap.put(tp, lst);
+        }
+        return new ConsumerRecords<String, String>(recordMap);
+    }
+
+    @Override
+    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
+        ON_COMMIT_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void close() {
+        CLOSE_COUNT.incrementAndGet();
+    }
+
+    public static void resetCounters() {
+        INIT_COUNT.set(0);
+        CLOSE_COUNT.set(0);
+        ON_COMMIT_COUNT.set(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java 
b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
new file mode 100644
index 0000000..cee1247
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerInterceptor;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+public class MockProducerInterceptor implements ProducerInterceptor<String, 
String> {
+    public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ONSEND_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
+    public static final String APPEND_STRING_PROP = "mock.interceptor.append";
+    private String appendStr;
+
+    public MockProducerInterceptor() {
+        INIT_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        // ensure this method is called and expected configs are passed in
+        Object o = configs.get(APPEND_STRING_PROP);
+        if (o == null)
+            throw new ConfigException("Mock producer interceptor expects 
configuration " + APPEND_STRING_PROP);
+        if (o != null && o instanceof String)
+            appendStr = (String) o;
+
+        // clientId also must be in configs
+        Object clientIdValue = configs.get(ProducerConfig.CLIENT_ID_CONFIG);
+        if (clientIdValue == null)
+            throw new ConfigException("Mock producer interceptor expects 
configuration " + ProducerConfig.CLIENT_ID_CONFIG);
+    }
+
+    @Override
+    public ProducerRecord<String, String> onSend(ProducerRecord<String, 
String> record) {
+        ONSEND_COUNT.incrementAndGet();
+        ProducerRecord<String, String> newRecord = new ProducerRecord<>(
+                record.topic(), record.partition(), record.key(), 
record.value().concat(appendStr));
+        return newRecord;
+    }
+
+    @Override
+    public void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {
+        if (exception != null)
+            ON_ERROR_COUNT.incrementAndGet();
+        else if (metadata != null)
+            ON_SUCCESS_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void close() {
+        CLOSE_COUNT.incrementAndGet();
+    }
+
+    public static void resetCounters() {
+        INIT_COUNT.set(0);
+        CLOSE_COUNT.set(0);
+        ONSEND_COUNT.set(0);
+        ON_SUCCESS_COUNT.set(0);
+        ON_ERROR_COUNT.set(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index eb24706..bc3a6ce 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -290,16 +290,7 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
 
   protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], 
Array[Byte]], numRecords: Int, startingOffset: Int,
                                       startingKeyAndValueIndex: Int = 0, tp: 
TopicPartition = tp) {
-    val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
-    val maxIters = numRecords * 300
-    var iters = 0
-    while (records.size < numRecords) {
-      for (record <- consumer.poll(50).asScala)
-        records.add(record)
-      if (iters > maxIters)
-        throw new IllegalStateException("Failed to consume the expected 
records after " + iters + " iterations.")
-      iters += 1
-    }
+    val records = consumeRecords(consumer, numRecords)
     for (i <- 0 until numRecords) {
       val record = records.get(i)
       val offset = startingOffset + i
@@ -312,11 +303,25 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
     }
   }
 
-  protected def awaitCommitCallback(consumer: Consumer[Array[Byte], 
Array[Byte]], commitCallback: CountConsumerCommitCallback): Unit = {
+  protected def consumeRecords[K, V](consumer: Consumer[K, V], numRecords: 
Int): ArrayList[ConsumerRecord[K, V]] = {
+    val records = new ArrayList[ConsumerRecord[K, V]]
+    val maxIters = numRecords * 300
+    var iters = 0
+    while (records.size < numRecords) {
+      for (record <- consumer.poll(50).asScala)
+        records.add(record)
+      if (iters > maxIters)
+        throw new IllegalStateException("Failed to consume the expected 
records after " + iters + " iterations.")
+      iters += 1
+    }
+    records
+  }
+
+  protected def awaitCommitCallback[K, V](consumer: Consumer[K, V], 
commitCallback: CountConsumerCommitCallback): Unit = {
     val startCount = commitCallback.count
     val started = System.currentTimeMillis()
     while (commitCallback.count == startCount && System.currentTimeMillis() - 
started < 10000)
-      this.consumers(0).poll(50)
+      consumer.poll(50)
     assertEquals(startCount + 1, commitCallback.count)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d5b43b19/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 6711edf..b2f96e5 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -12,17 +12,21 @@
   */
 package kafka.api
 
+
+import java.util
 import java.util.Properties
+
 import java.util.regex.Pattern
 
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.CompressionType
-import org.apache.kafka.common.serialization.ByteArrayDeserializer
+import org.apache.kafka.common.serialization.{StringDeserializer, 
StringSerializer, ByteArrayDeserializer, ByteArraySerializer}
 import org.apache.kafka.common.errors.{InvalidTopicException, 
RecordTooLargeException}
+import org.apache.kafka.test.{MockProducerInterceptor, MockConsumerInterceptor}
 import org.junit.Assert._
 import org.junit.Test
 import scala.collection.mutable.Buffer
@@ -522,6 +526,144 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     runMultiConsumerSessionTimeoutTest(true)
   }
 
+  @Test
+  def testInterceptors() {
+    val appendStr = "mock"
+    // create producer with interceptor
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockProducerInterceptor")
+    producerProps.put("mock.interceptor.append", appendStr)
+    val testProducer = new KafkaProducer[String,String](producerProps, new 
StringSerializer, new StringSerializer)
+
+    // produce records
+    val numRecords = 10
+    (0 until numRecords).map { i =>
+      testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key 
$i", s"value $i"))
+    }.foreach(_.get)
+    assertEquals(numRecords, MockProducerInterceptor.ONSEND_COUNT.intValue())
+    assertEquals(numRecords, 
MockProducerInterceptor.ON_SUCCESS_COUNT.intValue())
+    // send invalid record
+    try {
+      testProducer.send(null, null)
+      fail("Should not allow sending a null record")
+    } catch {
+      case e: Throwable => assertEquals("Interceptor should be notified about 
exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue()) // this is ok
+    }
+
+    // create consumer with interceptor
+    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
+    val testConsumer = new KafkaConsumer[String, String](this.consumerConfig, 
new StringDeserializer(), new StringDeserializer())
+    testConsumer.assign(List(tp).asJava)
+    testConsumer.seek(tp, 0)
+
+    // consume and verify that values are modified by interceptors
+    val records = consumeRecords(testConsumer, numRecords)
+    for (i <- 0 until numRecords) {
+      val record = records.get(i)
+      assertEquals(s"key $i", new String(record.key()))
+      assertEquals(s"value $i$appendStr".toUpperCase, new 
String(record.value()))
+    }
+
+    // commit sync and verify onCommit is called
+    val commitCountBefore = MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
+    testConsumer.commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, new 
OffsetAndMetadata(2L))).asJava)
+    assertEquals(2, testConsumer.committed(tp).offset)
+    assertEquals(commitCountBefore+1, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+
+    // commit async and verify onCommit is called
+    val commitCallback = new CountConsumerCommitCallback()
+    testConsumer.commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp, new 
OffsetAndMetadata(5L))).asJava, commitCallback)
+    awaitCommitCallback(testConsumer, commitCallback)
+    assertEquals(5, testConsumer.committed(tp).offset)
+    assertEquals(commitCountBefore+2, 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue())
+
+    testConsumer.close()
+    testProducer.close()
+
+    // cleanup
+    MockConsumerInterceptor.resetCounters()
+    MockProducerInterceptor.resetCounters()
+  }
+
+  @Test
+  def testAutoCommitIntercept() {
+    val topic2 = "topic2"
+    TestUtils.createTopic(this.zkUtils, topic2, 2, serverCount, this.servers)
+
+    // produce records
+    val numRecords = 100
+    val testProducer = new KafkaProducer[String,String](this.producerConfig, 
new StringSerializer, new StringSerializer)
+    (0 until numRecords).map { i =>
+      testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key 
$i", s"value $i"))
+    }.foreach(_.get)
+
+    // create consumer with interceptor
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
"true")
+    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
+    val testConsumer = new KafkaConsumer[String, String](this.consumerConfig, 
new StringDeserializer(), new StringDeserializer())
+    val rebalanceListener = new ConsumerRebalanceListener {
+      override def onPartitionsAssigned(partitions: 
util.Collection[TopicPartition]) = {
+        // keep partitions paused in this test so that we can verify the 
commits based on specific seeks
+        partitions.asScala.foreach(testConsumer.pause(_))
+      }
+
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]) = {}
+    }
+    changeConsumerSubscriptionAndValidateAssignment(testConsumer, List(topic), 
Set(tp, tp2), rebalanceListener)
+    testConsumer.seek(tp, 10)
+    testConsumer.seek(tp2, 20)
+
+    // change subscription to trigger rebalance
+    val commitCountBeforeRebalance = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
+    changeConsumerSubscriptionAndValidateAssignment(testConsumer,
+                                                    List(topic, topic2), 
Set(tp, tp2, new TopicPartition(topic2, 0),
+                                                    new TopicPartition(topic2, 
1)),
+                                                    rebalanceListener)
+
+    // after rebalancing, we should have reset to the committed positions
+    assertEquals(10, testConsumer.committed(tp).offset)
+    assertEquals(20, testConsumer.committed(tp2).offset)
+    assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeRebalance)
+
+    // verify commits are intercepted on close
+    val commitCountBeforeClose = 
MockConsumerInterceptor.ON_COMMIT_COUNT.intValue()
+    testConsumer.close()
+    assertTrue(MockConsumerInterceptor.ON_COMMIT_COUNT.intValue() > 
commitCountBeforeClose)
+    testProducer.close()
+
+    // cleanup
+    MockConsumerInterceptor.resetCounters()
+  }
+
+  @Test
+  def testInterceptorsWithWrongKeyValue() {
+    val appendStr = "mock"
+    // create producer with interceptor that has different key and value types 
from the producer
+    val producerProps = new Properties()
+    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
+    producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockProducerInterceptor")
+    producerProps.put("mock.interceptor.append", appendStr)
+    val testProducer = new 
KafkaProducer[Array[Byte],Array[Byte]](producerProps, new 
ByteArraySerializer(), new ByteArraySerializer())
+
+    // producing records should succeed
+    testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), 
s"key".getBytes, s"value will not be modified".getBytes))
+
+    // create consumer with interceptor that has different key and value types 
from the consumer
+    this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, 
"org.apache.kafka.test.MockConsumerInterceptor")
+    val testConsumer = new 
KafkaConsumer[Array[Byte],Array[Byte]](this.consumerConfig, new 
ByteArrayDeserializer(), new ByteArrayDeserializer())
+    testConsumer.assign(List(tp).asJava)
+    testConsumer.seek(tp, 0)
+
+    // consume and verify that values are not modified by interceptors -- 
their exceptions are caught and logged, but not propagated
+    val records = consumeRecords(testConsumer, 1)
+    val record = records.get(0)
+    assertEquals(s"value will not be modified", new String(record.value()))
+
+    testConsumer.close()
+    testProducer.close()
+  }
+
   def runMultiConsumerSessionTimeoutTest(closeConsumer: Boolean): Unit = {
     // use consumers defined in this class plus one additional consumer
     // Use topic defined in this class + one additional topic
@@ -705,4 +847,15 @@ class PlaintextConsumerTest extends BaseConsumerTest {
         s"Did not get valid assignment for partitions ${subscriptions.asJava} 
after we changed subscription")
   }
 
+  def changeConsumerSubscriptionAndValidateAssignment[K, V](consumer: 
Consumer[K, V],
+                                                            topicsToSubscribe: 
List[String],
+                                                            subscriptions: 
Set[TopicPartition],
+                                                            rebalanceListener: 
ConsumerRebalanceListener): Unit = {
+    consumer.subscribe(topicsToSubscribe.asJava, rebalanceListener)
+    TestUtils.waitUntilTrue(() => {
+      val records = consumer.poll(50)
+      consumer.assignment() == subscriptions.asJava
+    }, s"Expected partitions ${subscriptions.asJava} but actually got 
${consumer.assignment()}")
+  }
+
 }

Reply via email to