KAFKA-4093; Cluster Id (KIP-78)

This PR implements  KIP-78:Cluster Identifiers 
[(link)](https://cwiki.apache.org/confluence/display/KAFKA/KIP-78%3A+Cluster+Id#KIP-78:ClusterId-Overview)
 and includes the following changes:

1. Changes to broker code
        - generate cluster id and store it in Zookeeper
        - update protocol to add cluster id to metadata request and response
        - add ClusterResourceListener interface, ClusterResource class and 
ClusterMetadataListeners utility class
        - send ClusterResource events to the metric reporters
2. Changes to client code
        - update Cluster and Metadata code to support cluster id
        - update clients for sending ClusterResource events to interceptors, 
(de)serializers and metric reporters
3. Integration tests for interceptors, (de)serializers and metric reporters for 
clients and for protocol changes and metric reporters for broker.
4. System tests for upgrading from previous versions.

Author: Sumit Arrawatia <sumit.arrawa...@gmail.com>
Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #1830 from arrawatia/kip-78


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ecc1fb10
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ecc1fb10
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ecc1fb10

Branch: refs/heads/trunk
Commit: ecc1fb10fad39c2da82731a36552cb5dd9ac2858
Parents: d7bffeb
Author: Sumit Arrawatia <sumit.arrawa...@gmail.com>
Authored: Sat Sep 17 04:10:13 2016 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Sat Sep 17 07:53:25 2016 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 .../java/org/apache/kafka/clients/Metadata.java |  32 ++-
 .../clients/consumer/ConsumerInterceptor.java   |   2 +
 .../kafka/clients/consumer/KafkaConsumer.java   |  59 +++--
 .../kafka/clients/producer/KafkaProducer.java   |  62 +++--
 .../clients/producer/ProducerInterceptor.java   |   2 +
 .../java/org/apache/kafka/common/Cluster.java   |  32 ++-
 .../apache/kafka/common/ClusterResource.java    |  50 +++++
 .../kafka/common/ClusterResourceListener.java   |  50 +++++
 .../internals/ClusterResourceListeners.java     |  63 ++++++
 .../kafka/common/metrics/MetricsReporter.java   |   2 +
 .../apache/kafka/common/protocol/Protocol.java  |  16 +-
 .../kafka/common/requests/MetadataRequest.java  |   3 +-
 .../kafka/common/requests/MetadataResponse.java |  32 ++-
 .../common/serialization/Deserializer.java      |   2 +
 .../kafka/common/serialization/Serializer.java  |   2 +
 .../org/apache/kafka/clients/MetadataTest.java  |  44 +++-
 .../clients/consumer/KafkaConsumerTest.java     |   3 +
 .../clients/consumer/internals/FetcherTest.java |   2 +-
 .../clients/producer/KafkaProducerTest.java     |   6 +-
 .../clients/producer/MockProducerTest.java      |  17 +-
 .../internals/DefaultPartitionerTest.java       |   4 +-
 .../internals/RecordAccumulatorTest.java        |   3 +-
 .../clients/producer/internals/SenderTest.java  |   3 +-
 .../common/requests/RequestResponseTest.java    |  12 +-
 .../kafka/test/MockClusterResourceListener.java |  39 ++++
 .../kafka/test/MockConsumerInterceptor.java     |  23 +-
 .../org/apache/kafka/test/MockDeserializer.java |  59 +++++
 .../apache/kafka/test/MockMetricsReporter.java  |   4 +-
 .../kafka/test/MockProducerInterceptor.java     |  22 +-
 .../org/apache/kafka/test/MockSerializer.java   |  16 +-
 .../java/org/apache/kafka/test/TestUtils.java   |  41 +++-
 .../kafka/metrics/KafkaMetricsReporter.scala    |  12 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |   4 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  58 +++--
 .../kafka/server/KafkaServerStartable.scala     |   8 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala |  25 ++-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  30 +++
 .../kafka/api/EndToEndClusterIdTest.scala       | 225 +++++++++++++++++++
 .../kafka/api/PlaintextConsumerTest.scala       |   3 +
 .../scala/unit/kafka/metrics/MetricsTest.scala  |  11 +-
 .../KafkaMetricReporterClusterIdTest.scala      |  93 ++++++++
 .../unit/kafka/server/MetadataRequestTest.scala |  17 +-
 .../server/ServerGenerateClusterIdTest.scala    | 140 ++++++++++++
 .../test/scala/unit/kafka/utils/UtilsTest.scala |  28 ++-
 .../scala/unit/kafka/utils/ZkUtilsTest.scala    |   6 +
 .../WindowedStreamPartitionerTest.java          |   4 +-
 .../processor/DefaultPartitionGrouperTest.java  |   2 +-
 .../internals/RecordCollectorTest.java          |   2 +-
 .../internals/StreamPartitionAssignorTest.java  |   2 +-
 .../processor/internals/StreamThreadTest.java   |   2 +-
 tests/kafkatest/tests/core/upgrade_test.py      |  21 +-
 tests/kafkatest/version.py                      |   6 +-
 vagrant/base.sh                                 |   3 +
 54 files changed, 1265 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 632b516..7716f43 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -175,6 +175,7 @@
   <subpackage name="test">
     <allow pkg="org.apache.kafka" />
     <allow pkg="org.bouncycastle" />
+    <allow pkg="javax.xml.bind" />
   </subpackage>
 
   <subpackage name="connect">

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/clients/Metadata.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 5a5031e..f717001 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -3,9 +3,9 @@
  * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
  * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
  * License. You may obtain a copy of the License at
- * 
+ *
  * http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
  * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
  * specific language governing permissions and limitations under the License.
@@ -13,6 +13,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -27,13 +28,14 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 
 /**
  * A class encapsulating some of the logic around metadata.
  * <p>
  * This class is shared by the client thread (for partitioning) and the 
background sender thread.
- * 
+ *
  * Metadata is maintained for only a subset of topics, which can be added to 
over time. When we request metadata for a
  * topic we don't have any metadata for it will trigger a metadata update.
  * <p>
@@ -58,6 +60,7 @@ public final class Metadata {
     /* Topics with expiry time */
     private final Map<String, Long> topics;
     private final List<Listener> listeners;
+    private final ClusterResourceListeners clusterResourceListeners;
     private boolean needMetadataForAllTopics;
     private final boolean topicExpiryEnabled;
 
@@ -69,7 +72,7 @@ public final class Metadata {
     }
 
     public Metadata(long refreshBackoffMs, long metadataExpireMs) {
-        this(refreshBackoffMs, metadataExpireMs, false);
+        this(refreshBackoffMs, metadataExpireMs, false, new 
ClusterResourceListeners());
     }
 
     /**
@@ -78,8 +81,9 @@ public final class Metadata {
      *        polling
      * @param metadataExpireMs The maximum amount of time that metadata can be 
retained without refresh
      * @param topicExpiryEnabled If true, enable expiry of unused topics
+     * @param clusterResourceListeners List of ClusterResourceListeners which 
will receive metadata updates.
      */
-    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean 
topicExpiryEnabled) {
+    public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean 
topicExpiryEnabled, ClusterResourceListeners clusterResourceListeners) {
         this.refreshBackoffMs = refreshBackoffMs;
         this.metadataExpireMs = metadataExpireMs;
         this.topicExpiryEnabled = topicExpiryEnabled;
@@ -90,6 +94,7 @@ public final class Metadata {
         this.needUpdate = false;
         this.topics = new HashMap<>();
         this.listeners = new ArrayList<>();
+        this.clusterResourceListeners = clusterResourceListeners;
         this.needMetadataForAllTopics = false;
     }
 
@@ -189,6 +194,8 @@ public final class Metadata {
      * is set for topics if required and expired topics are removed from the 
metadata.
      */
     public synchronized void update(Cluster cluster, long now) {
+        Objects.requireNonNull(cluster, "cluster should not be null");
+
         this.needUpdate = false;
         this.lastRefreshMs = now;
         this.lastSuccessfulRefreshMs = now;
@@ -211,6 +218,7 @@ public final class Metadata {
         for (Listener listener: listeners)
             listener.onMetadataUpdate(cluster);
 
+        String previousClusterId = cluster.clusterResource().clusterId();
 
         if (this.needMetadataForAllTopics) {
             // the listener may change the interested topics, which could 
cause another metadata refresh.
@@ -221,6 +229,14 @@ public final class Metadata {
             this.cluster = cluster;
         }
 
+        // The bootstrap cluster is guaranteed not to have any useful 
information
+        if (!cluster.isBootstrapConfigured()) {
+            String clusterId = cluster.clusterResource().clusterId();
+            if (clusterId == null ? previousClusterId != null : 
!clusterId.equals(previousClusterId))
+                log.info("Cluster ID: {}", 
cluster.clusterResource().clusterId());
+            clusterResourceListeners.onUpdate(cluster.clusterResource());
+        }
+
         notifyAll();
         log.debug("Updated cluster metadata version {} to {}", this.version, 
this.cluster);
     }
@@ -232,7 +248,7 @@ public final class Metadata {
     public synchronized void failedUpdate(long now) {
         this.lastRefreshMs = now;
     }
-    
+
     /**
      * @return The current metadata version
      */
@@ -295,7 +311,9 @@ public final class Metadata {
         Collection<PartitionInfo> partitionInfos = new ArrayList<>();
         List<Node> nodes = Collections.emptyList();
         Set<String> internalTopics = Collections.emptySet();
+        String clusterId = null;
         if (cluster != null) {
+            clusterId = cluster.clusterResource().clusterId();
             internalTopics = cluster.internalTopics();
             unauthorizedTopics.addAll(cluster.unauthorizedTopics());
             unauthorizedTopics.retainAll(this.topics.keySet());
@@ -308,6 +326,6 @@ public final class Metadata {
             }
             nodes = cluster.nodes();
         }
-        return new Cluster(nodes, partitionInfos, unauthorizedTopics, 
internalTopics);
+        return new Cluster(clusterId, nodes, partitionInfos, 
unauthorizedTopics, internalTopics);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
index 70ea444..f8789fc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerInterceptor.java
@@ -32,6 +32,8 @@ import java.util.Map;
  * just log the errors.
  * <p>
  * ConsumerInterceptor callbacks are called from the same thread that invokes 
{@link org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)}.
+ * <p>
+ * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
 public interface ConsumerInterceptor<K, V> extends Configurable {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index cfa046f..8fa7ab7 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.clients.consumer.internals.SubscriptionState;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -607,7 +608,31 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
             reporters.add(new JmxReporter(JMX_PREFIX));
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = 
config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, 
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG));
+
+            // load interceptors and make sure they get clientId
+            Map<String, Object> userProvidedConfigs = config.originals();
+            userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
+            List<ConsumerInterceptor<K, V>> interceptorList = (List) (new 
ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    ConsumerInterceptor.class);
+            this.interceptors = interceptorList.isEmpty() ? null : new 
ConsumerInterceptors<>(interceptorList);
+            if (keyDeserializer == null) {
+                this.keyDeserializer = 
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                        Deserializer.class);
+                this.keyDeserializer.configure(config.originals(), true);
+            } else {
+                config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
+                this.keyDeserializer = keyDeserializer;
+            }
+            if (valueDeserializer == null) {
+                this.valueDeserializer = 
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                        Deserializer.class);
+                this.valueDeserializer.configure(config.originals(), false);
+            } else {
+                config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
+                this.valueDeserializer = valueDeserializer;
+            }
+            ClusterResourceListeners clusterResourceListeners = 
configureClusterResourceListeners(keyDeserializer, valueDeserializer, 
reporters, interceptorList);
+            this.metadata = new Metadata(retryBackoffMs, 
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, 
clusterResourceListeners);
             List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 0);
             String metricGrpPrefix = "consumer";
@@ -628,12 +653,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
             List<PartitionAssignor> assignors = config.getConfiguredInstances(
                     ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
                     PartitionAssignor.class);
-            // load interceptors and make sure they get clientId
-            Map<String, Object> userProvidedConfigs = config.originals();
-            userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
-            List<ConsumerInterceptor<K, V>> interceptorList = (List) (new 
ConsumerConfig(userProvidedConfigs)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
-                    ConsumerInterceptor.class);
-            this.interceptors = interceptorList.isEmpty() ? null : new 
ConsumerInterceptors<>(interceptorList);
             this.coordinator = new ConsumerCoordinator(this.client,
                     config.getString(ConsumerConfig.GROUP_ID_CONFIG),
                     config.getInt(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG),
@@ -651,22 +670,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> 
{
                     
config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
                     this.interceptors,
                     
config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
-            if (keyDeserializer == null) {
-                this.keyDeserializer = 
config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                        Deserializer.class);
-                this.keyDeserializer.configure(config.originals(), true);
-            } else {
-                config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
-                this.keyDeserializer = keyDeserializer;
-            }
-            if (valueDeserializer == null) {
-                this.valueDeserializer = 
config.getConfiguredInstance(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                        Deserializer.class);
-                this.valueDeserializer.configure(config.originals(), false);
-            } else {
-                config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
-                this.valueDeserializer = valueDeserializer;
-            }
             this.fetcher = new Fetcher<>(this.client,
                     config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
@@ -1412,6 +1415,16 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
         this.client.wakeup();
     }
 
+    private ClusterResourceListeners 
configureClusterResourceListeners(Deserializer<K> keyDeserializer, 
Deserializer<V> valueDeserializer, List<?>... candidateLists) {
+        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
+        for (List<?> candidateList: candidateLists)
+            clusterResourceListeners.maybeAddAll(candidateList);
+
+        clusterResourceListeners.maybeAdd(keyDeserializer);
+        clusterResourceListeners.maybeAdd(valueDeserializer);
+        return clusterResourceListeners;
+    }
+
     private void close(boolean swallowException) {
         log.trace("Closing the Kafka consumer.");
         AtomicReference<Throwable> firstException = new AtomicReference<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index fbc1099..3efc7b5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -31,6 +31,7 @@ import 
org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.PartitionInfo;
@@ -224,7 +225,31 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.partitioner = 
config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
Partitioner.class);
             long retryBackoffMs = 
config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
-            this.metadata = new Metadata(retryBackoffMs, 
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true);
+            if (keySerializer == null) {
+                this.keySerializer = 
config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                        Serializer.class);
+                this.keySerializer.configure(config.originals(), true);
+            } else {
+                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+                this.keySerializer = keySerializer;
+            }
+            if (valueSerializer == null) {
+                this.valueSerializer = 
config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                        Serializer.class);
+                this.valueSerializer.configure(config.originals(), false);
+            } else {
+                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+                this.valueSerializer = valueSerializer;
+            }
+
+            // load interceptors and make sure they get clientId
+            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
+            List<ProducerInterceptor<K, V>> interceptorList = (List) (new 
ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
+                    ProducerInterceptor.class);
+            this.interceptors = interceptorList.isEmpty() ? null : new 
ProducerInterceptors<>(interceptorList);
+
+            ClusterResourceListeners clusterResourceListeners = 
configureClusterResourceListeners(keySerializer, valueSerializer, 
interceptorList, reporters);
+            this.metadata = new Metadata(retryBackoffMs, 
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, 
clusterResourceListeners);
             this.maxRequestSize = 
config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = 
config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = 
CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
@@ -272,6 +297,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     retryBackoffMs,
                     metrics,
                     time);
+
             List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 
time.milliseconds());
             ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config.values());
@@ -301,28 +327,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
 
             this.errors = this.metrics.sensor("errors");
 
-            if (keySerializer == null) {
-                this.keySerializer = 
config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-                        Serializer.class);
-                this.keySerializer.configure(config.originals(), true);
-            } else {
-                config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
-                this.keySerializer = keySerializer;
-            }
-            if (valueSerializer == null) {
-                this.valueSerializer = 
config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-                        Serializer.class);
-                this.valueSerializer.configure(config.originals(), false);
-            } else {
-                config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
-                this.valueSerializer = valueSerializer;
-            }
-
-            // load interceptors and make sure they get clientId
-            userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
-            List<ProducerInterceptor<K, V>> interceptorList = (List) (new 
ProducerConfig(userProvidedConfigs)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
-                    ProducerInterceptor.class);
-            this.interceptors = interceptorList.isEmpty() ? null : new 
ProducerInterceptors<>(interceptorList);
 
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
@@ -712,12 +716,22 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
             throw new KafkaException("Failed to close kafka producer", 
firstException.get());
     }
 
+    private ClusterResourceListeners 
configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> 
valueSerializer, List<?>... candidateLists) {
+        ClusterResourceListeners clusterResourceListeners = new 
ClusterResourceListeners();
+        for (List<?> candidateList: candidateLists)
+            clusterResourceListeners.maybeAddAll(candidateList);
+
+        clusterResourceListeners.maybeAdd(keySerializer);
+        clusterResourceListeners.maybeAdd(valueSerializer);
+        return clusterResourceListeners;
+    }
+
     /**
      * computes partition for given record.
      * if the record has partition returns the value otherwise
      * calls configured partitioner class to compute the partition.
      */
-    private int partition(ProducerRecord<K, V> record, byte[] serializedKey , 
byte[] serializedValue, Cluster cluster) {
+    private int partition(ProducerRecord<K, V> record, byte[] serializedKey, 
byte[] serializedValue, Cluster cluster) {
         Integer partition = record.partition();
         if (partition != null) {
             List<PartitionInfo> partitions = 
cluster.partitionsForTopic(record.topic());

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
index e835a69..96643d3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
@@ -31,6 +31,8 @@ import org.apache.kafka.common.Configurable;
  * just log the errors.
  * <p>
  * ProducerInterceptor callbacks may be called from multiple threads. 
Interceptor implementation must ensure thread-safety, if needed.
+ * <p>
+ * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
 public interface ProducerInterceptor<K, V> extends Configurable {
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/Cluster.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java 
b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 31447c5..471ae26 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -38,39 +38,43 @@ public final class Cluster {
     private final Map<String, List<PartitionInfo>> availablePartitionsByTopic;
     private final Map<Integer, List<PartitionInfo>> partitionsByNode;
     private final Map<Integer, Node> nodesById;
+    private final ClusterResource clusterResource;
 
     /**
      * Create a new cluster with the given nodes and partitions
      * @param nodes The nodes in the cluster
      * @param partitions Information about a subset of the topic-partitions 
this cluster hosts
-     * @deprecated Use the Cluster constructor with 4 parameters
+     * @deprecated Use the Cluster constructor with 5 parameters
      */
     @Deprecated
     public Cluster(Collection<Node> nodes,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics) {
-        this(false, nodes, partitions, unauthorizedTopics, 
Collections.<String>emptySet());
+        this(null, false, nodes, partitions, unauthorizedTopics, 
Collections.<String>emptySet());
     }
 
+
     /**
-     * Create a new cluster with the given nodes and partitions
+     * Create a new cluster with the given id, nodes and partitions
      * @param nodes The nodes in the cluster
      * @param partitions Information about a subset of the topic-partitions 
this cluster hosts
      */
-    public Cluster(Collection<Node> nodes,
+    public Cluster(String clusterId,
+                   Collection<Node> nodes,
                    Collection<PartitionInfo> partitions,
                    Set<String> unauthorizedTopics,
                    Set<String> internalTopics) {
-        this(false, nodes, partitions, unauthorizedTopics, internalTopics);
+        this(clusterId, false, nodes, partitions, unauthorizedTopics, 
internalTopics);
     }
 
-    private Cluster(boolean isBootstrapConfigured,
+    private Cluster(String clusterId,
+                    boolean isBootstrapConfigured,
                     Collection<Node> nodes,
                     Collection<PartitionInfo> partitions,
                     Set<String> unauthorizedTopics,
                     Set<String> internalTopics) {
         this.isBootstrapConfigured = isBootstrapConfigured;
-
+        this.clusterResource = new ClusterResource(clusterId);
         // make a randomized, unmodifiable copy of the nodes
         List<Node> copy = new ArrayList<>(nodes);
         Collections.shuffle(copy);
@@ -128,7 +132,7 @@ public final class Cluster {
      * Create an empty cluster instance with no nodes and no topic-partitions.
      */
     public static Cluster empty() {
-        return new Cluster(new ArrayList<Node>(0), new 
ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
+        return new Cluster(null, new ArrayList<Node>(0), new 
ArrayList<PartitionInfo>(0), Collections.<String>emptySet(),
                 Collections.<String>emptySet());
     }
 
@@ -142,7 +146,7 @@ public final class Cluster {
         int nodeId = -1;
         for (InetSocketAddress address : addresses)
             nodes.add(new Node(nodeId--, address.getHostString(), 
address.getPort()));
-        return new Cluster(true, nodes, new ArrayList<PartitionInfo>(0), 
Collections.<String>emptySet(), Collections.<String>emptySet());
+        return new Cluster(null, true, nodes, new ArrayList<PartitionInfo>(0), 
Collections.<String>emptySet(), Collections.<String>emptySet());
     }
 
     /**
@@ -151,8 +155,8 @@ public final class Cluster {
     public Cluster withPartitions(Map<TopicPartition, PartitionInfo> 
partitions) {
         Map<TopicPartition, PartitionInfo> combinedPartitions = new 
HashMap<>(this.partitionsByTopicPartition);
         combinedPartitions.putAll(partitions);
-        return new Cluster(this.nodes, combinedPartitions.values(), new 
HashSet<>(this.unauthorizedTopics),
-                new HashSet<>(this.internalTopics));
+        return new Cluster(clusterResource.clusterId(), this.nodes, 
combinedPartitions.values(),
+                new HashSet<>(this.unauthorizedTopics), new 
HashSet<>(this.internalTopics));
     }
 
     /**
@@ -250,9 +254,13 @@ public final class Cluster {
         return isBootstrapConfigured;
     }
 
+    public ClusterResource clusterResource() {
+        return clusterResource;
+    }
+
     @Override
     public String toString() {
-        return "Cluster(nodes = " + this.nodes + ", partitions = " + 
this.partitionsByTopicPartition.values() + ")";
+        return "Cluster(id = " + clusterResource.clusterId() + ", nodes = " + 
this.nodes + ", partitions = " + this.partitionsByTopicPartition.values() + ")";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/ClusterResource.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/ClusterResource.java 
b/clients/src/main/java/org/apache/kafka/common/ClusterResource.java
new file mode 100644
index 0000000..29a87d4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/ClusterResource.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+
+/**
+ * The <code>ClusterResource</code> class encapsulates metadata for a Kafka 
cluster.
+ */
+public class ClusterResource {
+
+    private final String clusterId;
+
+    /**
+     * Create {@link ClusterResource} with a cluster id. Note that cluster id 
may be {@code null} if the
+     * metadata request was sent to a broker without support for cluster ids. 
The first version of Kafka
+     * to support cluster id is 0.10.1.0.
+     * @param clusterId
+     */
+    public ClusterResource(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    /**
+     * Return the cluster id. Note that it may be {@code null} if the metadata 
request was sent to a broker without
+     * support for cluster ids. The first version of Kafka to support cluster 
id is 0.10.1.0.
+     */
+    public String clusterId() {
+        return clusterId;
+    }
+
+    @Override
+    public String toString() {
+        return "ClusterResource(clusterId=" + clusterId + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java 
b/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java
new file mode 100644
index 0000000..71de534
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/ClusterResourceListener.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.common;
+
+/**
+ * A callback interface that users can implement when they wish to get 
notified about changes in the Cluster metadata.
+ * <p>
+ * Users who need access to cluster metadata in interceptors, metric 
reporters, serializers and deserializers
+ * can implement this interface. The order of method calls for each of these 
types is described below.
+ * <p>
+ * <h4>Clients</h4>
+ * There will be one invocation of {@link 
ClusterResourceListener#onUpdate(ClusterResource)} after each metadata response.
+ * Note that the cluster id may be null when the Kafka broker version is below 
0.10.1.0. If you receive a null cluster id, you can expect it to always be null 
unless you have a cluster with multiple broker versions which can happen if the 
cluster is being upgraded while the client is running.
+ * <p>
+ * {@link org.apache.kafka.clients.producer.ProducerInterceptor} : The {@link 
ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked after 
{@link 
org.apache.kafka.clients.producer.ProducerInterceptor#onSend(ProducerRecord)}
+ * but before {@link 
org.apache.kafka.clients.producer.ProducerInterceptor#onAcknowledgement(RecordMetadata,
 Exception)} .
+ * <p>
+ * {@link org.apache.kafka.clients.consumer.ConsumerInterceptor} : The {@link 
ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked 
before {@link 
org.apache.kafka.clients.consumer.ConsumerInterceptor#onConsume(ConsumerRecords)}
+ * <p>
+ * {@link org.apache.kafka.common.serialization.Serializer} : The {@link 
ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked 
before {@link 
org.apache.kafka.common.serialization.Serializer#serialize(String, Object)}
+ * <p>
+ * {@link org.apache.kafka.common.serialization.Deserializer} : The {@link 
ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked 
before {@link 
org.apache.kafka.common.serialization.Deserializer#deserialize(String, byte[])}
+ * <p>
+ * {@link org.apache.kafka.common.metrics.MetricsReporter} : The {@link 
ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked after 
first {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)} 
invocation for Producer metrics reporter
+ * and after first {@link 
org.apache.kafka.clients.consumer.KafkaConsumer#poll(long)} invocation for 
Consumer metrics reporters. The reporter may receive metric events from the 
network layer before this method is invoked.
+ * <h4>Broker</h4>
+ * There is a single invocation {@link 
ClusterResourceListener#onUpdate(ClusterResource)} on broker start-up and the 
cluster metadata will never change.
+ * <p>
+ * KafkaMetricsReporter : The {@link 
ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked 
during the bootup of the Kafka broker. The reporter may receive metric events 
from the network layer before this method is invoked.
+ * <p>
+ * {@link org.apache.kafka.common.metrics.MetricsReporter} : The {@link 
ClusterResourceListener#onUpdate(ClusterResource)} method will be invoked 
during the bootup of the Kafka broker. The reporter may receive metric events 
from the network layer before this method is invoked.
+ */
+public interface ClusterResourceListener {
+    /**
+     * A callback method that a user can implement to get updates for {@link 
ClusterResource}.
+     * @param clusterResource cluster metadata
+     */
+    void onUpdate(ClusterResource clusterResource);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/internals/ClusterResourceListeners.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/ClusterResourceListeners.java
 
b/clients/src/main/java/org/apache/kafka/common/internals/ClusterResourceListeners.java
new file mode 100644
index 0000000..cab29cc
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/ClusterResourceListeners.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.internals;
+
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ClusterResourceListeners {
+
+    private final List<ClusterResourceListener> clusterResourceListeners;
+
+    public ClusterResourceListeners() {
+        this.clusterResourceListeners = new ArrayList<>();
+    }
+
+    /**
+     * Add only if the candidate implements {@link ClusterResourceListener}.
+     * @param candidate Object which might implement {@link 
ClusterResourceListener}
+     */
+    public void maybeAdd(Object candidate) {
+        if (candidate instanceof ClusterResourceListener) {
+            clusterResourceListeners.add((ClusterResourceListener) candidate);
+        }
+    }
+
+    /**
+     * Add all items who implement {@link ClusterResourceListener} from the 
list.
+     * @param candidateList List of objects which might implement {@link 
ClusterResourceListener}
+     */
+    public void maybeAddAll(List<?> candidateList) {
+        for (Object candidate : candidateList) {
+            this.maybeAdd(candidate);
+        }
+    }
+
+    /**
+     * Send the updated cluster metadata to all {@link 
ClusterResourceListener}.
+     * @param cluster Cluster metadata
+     */
+    public void onUpdate(ClusterResource cluster) {
+        for (ClusterResourceListener clusterResourceListener : 
clusterResourceListeners) {
+            clusterResourceListener.onUpdate(cluster);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index e2a1d80..ab75813 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -18,6 +18,8 @@ import org.apache.kafka.common.Configurable;
 
 /**
  * A plugin interface to allow things to listen as new metrics are created so 
they can be reported.
+ * <p>
+ * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
 public interface MetricsReporter extends Configurable {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 313477f..bda4757 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -60,6 +60,9 @@ public class Protocol {
                                                                           
ArrayOf.nullable(STRING),
                                                                           "An 
array of topics to fetch metadata for. If the topics array is null fetch 
metadata for all topics."));
 
+    /* The v2 metadata request is the same as v1. An additional field for 
cluster id has been added to the v2 metadata response */
+    public static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1;
+
     public static final Schema METADATA_BROKER_V0 = new Schema(new 
Field("node_id", INT32, "The broker id."),
                                                    new Field("host", STRING, 
"The hostname of the broker."),
                                                    new Field("port", INT32,
@@ -116,8 +119,17 @@ public class Protocol {
                                                                      "The 
broker id of the controller broker."),
                                                                  new 
Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
 
-    public static final Schema[] METADATA_REQUEST = new Schema[] 
{METADATA_REQUEST_V0, METADATA_REQUEST_V1};
-    public static final Schema[] METADATA_RESPONSE = new Schema[] 
{METADATA_RESPONSE_V0, METADATA_RESPONSE_V1};
+    public static final Schema METADATA_RESPONSE_V2 = new Schema(new 
Field("brokers", new ArrayOf(METADATA_BROKER_V1),
+                                                                    "Host and 
port information for all brokers."),
+                                                                 new 
Field("cluster_id", NULLABLE_STRING,
+                                                                     "The 
cluster id that this broker belongs to."),
+                                                                 new 
Field("controller_id", INT32,
+                                                                     "The 
broker id of the controller broker."),
+                                                                 new 
Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1)));
+
+
+    public static final Schema[] METADATA_REQUEST = new Schema[] 
{METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2};
+    public static final Schema[] METADATA_RESPONSE = new Schema[] 
{METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2};
 
     /* Produce api */
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index f0cb8fc..24a9bfc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -77,7 +77,8 @@ public class MetadataRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
             case 1:
-                return new MetadataResponse(Collections.<Node>emptyList(), 
MetadataResponse.NO_CONTROLLER_ID, topicMetadatas, versionId);
+            case 2:
+                return new MetadataResponse(Collections.<Node>emptyList(), 
null, MetadataResponse.NO_CONTROLLER_ID, topicMetadatas, versionId);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.METADATA.id)));

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 4bf162d..444941b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -44,6 +44,8 @@ public class MetadataResponse extends AbstractRequestResponse 
{
     private static final String CONTROLLER_ID_KEY_NAME = "controller_id";
     public static final int NO_CONTROLLER_ID = -1;
 
+    private static final String CLUSTER_ID_KEY_NAME = "cluster_id";
+
     // topic level field names
     private static final String TOPIC_ERROR_CODE_KEY_NAME = "topic_error_code";
 
@@ -78,23 +80,24 @@ public class MetadataResponse extends 
AbstractRequestResponse {
     private final Collection<Node> brokers;
     private final Node controller;
     private final List<TopicMetadata> topicMetadata;
+    private final String clusterId;
 
     /**
      * Constructor for the latest version
      */
-    public MetadataResponse(List<Node> brokers, int controllerId, 
List<TopicMetadata> topicMetadata) {
-        this(brokers, controllerId, topicMetadata, CURRENT_VERSION);
+    public MetadataResponse(List<Node> brokers, String clusterId, int 
controllerId, List<TopicMetadata> topicMetadata) {
+        this(brokers, clusterId, controllerId, topicMetadata, CURRENT_VERSION);
     }
 
     /**
      * Constructor for a specific version
      */
-    public MetadataResponse(List<Node> brokers, int controllerId, 
List<TopicMetadata> topicMetadata, int version) {
+    public MetadataResponse(List<Node> brokers, String clusterId, int 
controllerId, List<TopicMetadata> topicMetadata, int version) {
         super(new Struct(ProtoUtils.responseSchema(ApiKeys.METADATA.id, 
version)));
-
         this.brokers = brokers;
         this.controller = getControllerNode(controllerId, brokers);
         this.topicMetadata = topicMetadata;
+        this.clusterId = clusterId;
 
         List<Struct> brokerArray = new ArrayList<>();
         for (Node node : brokers) {
@@ -113,6 +116,10 @@ public class MetadataResponse extends 
AbstractRequestResponse {
         if (struct.hasField(CONTROLLER_ID_KEY_NAME))
             struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
 
+        // This field only exists in v2+
+        if (struct.hasField(CLUSTER_ID_KEY_NAME))
+            struct.set(CLUSTER_ID_KEY_NAME, clusterId);
+
         List<Struct> topicMetadataArray = new 
ArrayList<>(topicMetadata.size());
         for (TopicMetadata metadata : topicMetadata) {
             Struct topicData = struct.instance(TOPIC_METADATA_KEY_NAME);
@@ -167,6 +174,13 @@ public class MetadataResponse extends 
AbstractRequestResponse {
         if (struct.hasField(CONTROLLER_ID_KEY_NAME))
             controllerId = struct.getInt(CONTROLLER_ID_KEY_NAME);
 
+        // This field only exists in v2+
+        if (struct.hasField(CLUSTER_ID_KEY_NAME)) {
+            this.clusterId = struct.getString(CLUSTER_ID_KEY_NAME);
+        } else {
+            this.clusterId = null;
+        }
+
         List<TopicMetadata> topicMetadata = new ArrayList<>();
         Object[] topicInfos = (Object[]) struct.get(TOPIC_METADATA_KEY_NAME);
         for (int i = 0; i < topicInfos.length; i++) {
@@ -268,7 +282,7 @@ public class MetadataResponse extends 
AbstractRequestResponse {
             }
         }
 
-        return new Cluster(this.brokers, partitions, 
topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), internalTopics);
+        return new Cluster(this.clusterId, this.brokers, partitions, 
topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED), internalTopics);
     }
 
     /**
@@ -295,6 +309,14 @@ public class MetadataResponse extends 
AbstractRequestResponse {
         return controller;
     }
 
+    /**
+     * The cluster identifier returned in the metadata response.
+     * @return cluster identifier if it is present in the response, null 
otherwise.
+     */
+    public String clusterId() {
+        return this.clusterId;
+    }
+
     public static MetadataResponse parse(ByteBuffer buffer) {
         return parse(buffer, CURRENT_VERSION);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
index d6f4498..95f6fd7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java
@@ -21,6 +21,8 @@ import java.util.Map;
  * @param <T> Type to be deserialized into.
  *
  * A class that implements this interface is expected to have a constructor 
with no parameter.
+ * <p>
+ * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
 public interface Deserializer<T> extends Closeable {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java 
b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
index 88033b0..5cfcc63 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/serialization/Serializer.java
@@ -21,6 +21,8 @@ import java.util.Map;
  * @param <T> Type to be serialized from.
  *
  * A class that implements this interface is expected to have a constructor 
with no parameter.
+ * <p>
+ * Implement {@link org.apache.kafka.common.ClusterResourceListener} to 
receive cluster metadata once it's available. Please see the class 
documentation for ClusterResourceListener for more information.
  */
 public interface Serializer<T> extends Closeable {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java 
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 5eaa737..333a072 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -12,6 +12,7 @@
  */
 package org.apache.kafka.clients;
 
+import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -20,9 +21,11 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.test.MockClusterResourceListener;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Test;
@@ -117,7 +120,7 @@ public class MetadataTest {
         assertEquals(100, metadata.lastSuccessfulUpdate());
 
         metadata.needMetadataForAllTopics(true);
-        metadata.update(null, time);
+        metadata.update(Cluster.empty(), time);
         assertEquals(100, metadata.timeToNextUpdate(1000));
     }
 
@@ -129,7 +132,7 @@ public class MetadataTest {
 
         final List<String> expectedTopics = Collections.singletonList("topic");
         metadata.setTopics(expectedTopics);
-        metadata.update(new Cluster(
+        metadata.update(new Cluster(null,
                 Collections.singletonList(new Node(0, "host1", 1000)),
                 Arrays.asList(
                     new PartitionInfo("topic", 0, null, null, null),
@@ -145,6 +148,36 @@ public class MetadataTest {
     }
 
     @Test
+    public void testClusterListenerGetsNotifiedOfUpdate() {
+        long time = 0;
+        MockClusterResourceListener mockClusterListener = new 
MockClusterResourceListener();
+        ClusterResourceListeners listeners = new ClusterResourceListeners();
+        listeners.maybeAdd(mockClusterListener);
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, 
listeners);
+
+        String hostName = "www.example.com";
+        Cluster cluster = Cluster.bootstrap(Arrays.asList(new 
InetSocketAddress(hostName, 9002)));
+        metadata.update(cluster, time);
+        assertFalse("ClusterResourceListener should not called when metadata 
is updated with bootstrap Cluster",
+                MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
+
+        metadata.update(new Cluster(
+                        "dummy",
+                        Arrays.asList(new Node(0, "host1", 1000)),
+                        Arrays.asList(
+                                new PartitionInfo("topic", 0, null, null, 
null),
+                                new PartitionInfo("topic1", 0, null, null, 
null)),
+                        Collections.<String>emptySet(),
+                        Collections.<String>emptySet()),
+                100);
+
+        assertEquals("MockClusterResourceListener did not get cluster metadata 
correctly",
+                "dummy", mockClusterListener.clusterResource().clusterId());
+        assertTrue("MockClusterResourceListener should be called when metadata 
is updated with non-bootstrap Cluster",
+                MockClusterResourceListener.IS_ON_UPDATE_CALLED.get());
+    }
+
+    @Test
     public void testListenerGetsNotifiedOfUpdate() {
         long time = 0;
         final Set<String> topics = new HashSet<>();
@@ -158,6 +191,7 @@ public class MetadataTest {
         });
 
         metadata.update(new Cluster(
+                null,
                 Arrays.asList(new Node(0, "host1", 1000)),
                 Arrays.asList(
                     new PartitionInfo("topic", 0, null, null, null),
@@ -185,6 +219,7 @@ public class MetadataTest {
         metadata.addListener(listener);
 
         metadata.update(new Cluster(
+                "cluster",
                 Collections.singletonList(new Node(0, "host1", 1000)),
                 Arrays.asList(
                     new PartitionInfo("topic", 0, null, null, null),
@@ -196,6 +231,7 @@ public class MetadataTest {
         metadata.removeListener(listener);
 
         metadata.update(new Cluster(
+                "cluster",
                 Arrays.asList(new Node(0, "host1", 1000)),
                 Arrays.asList(
                     new PartitionInfo("topic2", 0, null, null, null),
@@ -210,7 +246,7 @@ public class MetadataTest {
 
     @Test
     public void testTopicExpiry() throws Exception {
-        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true);
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true, new 
ClusterResourceListeners());
 
         // Test that topic is expired if not used within the expiry interval
         long time = 0;
@@ -242,7 +278,7 @@ public class MetadataTest {
 
     @Test
     public void testNonExpiringMetadata() throws Exception {
-        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false);
+        metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false, new 
ClusterResourceListeners());
 
         // Test that topic is not expired if not used within the expiry 
interval
         long time = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 0791f13..a910f2f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -291,6 +291,9 @@ public class KafkaConsumerTest {
             consumer.close();
             assertEquals(1, MockConsumerInterceptor.INIT_COUNT.get());
             assertEquals(1, MockConsumerInterceptor.CLOSE_COUNT.get());
+            // Cluster metadata will only be updated on calling poll.
+            Assert.assertNull(MockConsumerInterceptor.CLUSTER_META.get());
+
         } finally {
             // cleanup since we are using mutable static variables in 
MockConsumerInterceptor
             MockConsumerInterceptor.resetCounters();

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 90ddcb6..15bd9a2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -694,7 +694,7 @@ public class FetcherTest {
         }
 
         MetadataResponse.TopicMetadata topicMetadata = new 
MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
-        return new MetadataResponse(cluster.nodes(), 
MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
+        return new MetadataResponse(cluster.nodes(), null, 
MetadataResponse.NO_CONTROLLER_ID, Arrays.asList(topicMetadata));
     }
 
     private Fetcher<byte[], byte[]> createFetcher(SubscriptionState 
subscriptions,

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 740a57d..333fbfd 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -104,6 +104,9 @@ public class KafkaProducerTest {
             Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
             Assert.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
 
+            // Cluster metadata will only be updated on calling onSend.
+            Assert.assertNull(MockProducerInterceptor.CLUSTER_META.get());
+
             producer.close();
             Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
             Assert.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
@@ -152,11 +155,12 @@ public class KafkaProducerTest {
         String topic = "topic";
         ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
"value");
         Collection<Node> nodes = Collections.singletonList(new Node(0, 
"host1", 1000));
-        final Cluster emptyCluster = new Cluster(nodes,
+        final Cluster emptyCluster = new Cluster(null, nodes,
                 Collections.<PartitionInfo>emptySet(),
                 Collections.<String>emptySet(),
                 Collections.<String>emptySet());
         final Cluster cluster = new Cluster(
+                "dummy",
                 Collections.singletonList(new Node(0, "host1", 1000)),
                 Arrays.asList(new PartitionInfo(topic, 0, null, null, null)),
                 Collections.<String>emptySet(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 9017869..e3f86ef 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -43,8 +43,8 @@ public class MockProducerTest {
     @Test
     @SuppressWarnings("unchecked")
     public void testAutoCompleteMock() throws Exception {
-        MockProducer<byte[], byte[]> producer = new MockProducer<byte[], 
byte[]>(true, new MockSerializer(), new MockSerializer());
-        ProducerRecord<byte[], byte[]> record = new ProducerRecord<byte[], 
byte[]>(topic, "key".getBytes(), "value".getBytes());
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(true, new 
MockSerializer(), new MockSerializer());
+        ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, 
"key".getBytes(), "value".getBytes());
         Future<RecordMetadata> metadata = producer.send(record);
         assertTrue("Send should be immediately complete", metadata.isDone());
         assertFalse("Send should be successful", isError(metadata));
@@ -59,9 +59,10 @@ public class MockProducerTest {
     public void testPartitioner() throws Exception {
         PartitionInfo partitionInfo0 = new PartitionInfo(topic, 0, null, null, 
null);
         PartitionInfo partitionInfo1 = new PartitionInfo(topic, 1, null, null, 
null);
-        Cluster cluster = new Cluster(new ArrayList<Node>(0), 
asList(partitionInfo0, partitionInfo1), Collections.<String>emptySet(), 
Collections.<String>emptySet());
-        MockProducer<String, String> producer = new MockProducer<String, 
String>(cluster, true, new DefaultPartitioner(), new StringSerializer(), new 
StringSerializer());
-        ProducerRecord<String, String> record = new ProducerRecord<String, 
String>(topic, "key", "value");
+        Cluster cluster = new Cluster(null, new ArrayList<Node>(0), 
asList(partitionInfo0, partitionInfo1),
+                Collections.<String>emptySet(), 
Collections.<String>emptySet());
+        MockProducer<String, String> producer = new MockProducer<>(cluster, 
true, new DefaultPartitioner(), new StringSerializer(), new StringSerializer());
+        ProducerRecord<String, String> record = new ProducerRecord<>(topic, 
"key", "value");
         Future<RecordMetadata> metadata = producer.send(record);
         assertEquals("Partition should be correct", 1, 
metadata.get().partition());
         producer.clear();
@@ -70,9 +71,9 @@ public class MockProducerTest {
 
     @Test
     public void testManualCompletion() throws Exception {
-        MockProducer<byte[], byte[]> producer = new MockProducer<byte[], 
byte[]>(false, new MockSerializer(), new MockSerializer());
-        ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<byte[], 
byte[]>(topic, "key1".getBytes(), "value1".getBytes());
-        ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<byte[], 
byte[]>(topic, "key2".getBytes(), "value2".getBytes());
+        MockProducer<byte[], byte[]> producer = new MockProducer<>(false, new 
MockSerializer(), new MockSerializer());
+        ProducerRecord<byte[], byte[]> record1 = new ProducerRecord<>(topic, 
"key1".getBytes(), "value1".getBytes());
+        ProducerRecord<byte[], byte[]> record2 = new ProducerRecord<>(topic, 
"key2".getBytes(), "value2".getBytes());
         Future<RecordMetadata> md1 = producer.send(record1);
         assertFalse("Send shouldn't have completed", md1.isDone());
         Future<RecordMetadata> md2 = producer.send(record2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
index 9748222..ee8441c 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/DefaultPartitionerTest.java
@@ -37,8 +37,8 @@ public class DefaultPartitionerTest {
     private List<PartitionInfo> partitions = asList(new PartitionInfo(topic, 
1, null, nodes, nodes),
                                                     new PartitionInfo(topic, 
2, node1, nodes, nodes),
                                                     new PartitionInfo(topic, 
0, node0, nodes, nodes));
-    private Cluster cluster = new Cluster(asList(node0, node1, node2), 
partitions, Collections.<String>emptySet(),
-            Collections.<String>emptySet());
+    private Cluster cluster = new Cluster("clusterId", asList(node0, node1, 
node2), partitions,
+            Collections.<String>emptySet(), Collections.<String>emptySet());
 
     @Test
     public void testKeyPartitionIsStable() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 37f87cc..216f07e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -64,7 +64,8 @@ public class RecordAccumulatorTest {
     private byte[] key = "key".getBytes();
     private byte[] value = "value".getBytes();
     private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value);
-    private Cluster cluster = new Cluster(Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3), Collections.<String>emptySet(), 
Collections.<String>emptySet());
+    private Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), 
Arrays.asList(part1, part2, part3),
+            Collections.<String>emptySet(), Collections.<String>emptySet());
     private Metrics metrics = new Metrics(time);
     private final long maxBlockTimeMs = 1000;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index b8a086b..b7f9e74 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -60,7 +61,7 @@ public class SenderTest {
     private MockTime time = new MockTime();
     private MockClient client = new MockClient(time);
     private int batchSize = 16 * 1024;
-    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+    private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true, new 
ClusterResourceListeners());
     private Cluster cluster = TestUtils.singletonCluster("test", 1);
     private Metrics metrics = null;
     private RecordAccumulator accumulator = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 766c745..dd77e03 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -73,8 +73,8 @@ public class RequestResponseTest {
                 createListOffsetResponse(),
                 MetadataRequest.allTopics(),
                 createMetadataRequest(Arrays.asList("topic1")),
-                
createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(1, new 
UnknownServerException()),
-                createMetadataResponse(1),
+                
createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(2, new 
UnknownServerException()),
+                createMetadataResponse(2),
                 createOffsetCommitRequest(2),
                 createOffsetCommitRequest(2).getErrorResponse(2, new 
UnknownServerException()),
                 createOffsetCommitResponse(),
@@ -112,8 +112,10 @@ public class RequestResponseTest {
         for (AbstractRequestResponse req : requestResponseList)
             checkSerialization(req, null);
 
-        createMetadataResponse(0);
-        createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(0, new 
UnknownServerException());
+        checkSerialization(createMetadataResponse(0), 0);
+        checkSerialization(createMetadataResponse(1), 1);
+        
checkSerialization(createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(0,
 new UnknownServerException()), 0);
+        
checkSerialization(createMetadataRequest(Arrays.asList("topic1")).getErrorResponse(1,
 new UnknownServerException()), 1);
         checkSerialization(createFetchRequest().getErrorResponse(0, new 
UnknownServerException()), 0);
         checkSerialization(createOffsetCommitRequest(0), 0);
         checkSerialization(createOffsetCommitRequest(0).getErrorResponse(0, 
new UnknownServerException()), 0);
@@ -315,7 +317,7 @@ public class RequestResponseTest {
         allTopicMetadata.add(new 
MetadataResponse.TopicMetadata(Errors.LEADER_NOT_AVAILABLE, "topic2", false,
                 Collections.<MetadataResponse.PartitionMetadata>emptyList()));
 
-        return new MetadataResponse(Arrays.asList(node), 
MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata, version);
+        return new MetadataResponse(Arrays.asList(node), null, 
MetadataResponse.NO_CONTROLLER_ID, allTopicMetadata, version);
     }
 
     private AbstractRequest createOffsetCommitRequest(int version) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/test/MockClusterResourceListener.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/test/MockClusterResourceListener.java 
b/clients/src/test/java/org/apache/kafka/test/MockClusterResourceListener.java
new file mode 100644
index 0000000..dc12a83
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/test/MockClusterResourceListener.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.ClusterResource;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class MockClusterResourceListener implements ClusterResourceListener {
+
+    private ClusterResource clusterResource;
+    public static final AtomicBoolean IS_ON_UPDATE_CALLED = new 
AtomicBoolean();
+
+    @Override
+    public void onUpdate(ClusterResource clusterResource) {
+        IS_ON_UPDATE_CALLED.set(true);
+        this.clusterResource = clusterResource;
+    }
+
+    public ClusterResource clusterResource() {
+        return clusterResource;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java 
b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
index cff12a3..8bf6983 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockConsumerInterceptor.java
@@ -16,13 +16,14 @@
  */
 package org.apache.kafka.test;
 
-
 import org.apache.kafka.clients.consumer.ConsumerInterceptor;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 
@@ -32,11 +33,15 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
-public class MockConsumerInterceptor implements ConsumerInterceptor<String, 
String> {
+public class MockConsumerInterceptor implements ClusterResourceListener, 
ConsumerInterceptor<String, String> {
     public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
     public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_COMMIT_COUNT = new AtomicInteger(0);
+    public static final AtomicReference<ClusterResource> CLUSTER_META = new 
AtomicReference<>();
+    public static final ClusterResource NO_CLUSTER_ID = new 
ClusterResource("no_cluster_id");
+    public static final AtomicReference<ClusterResource> 
CLUSTER_ID_BEFORE_ON_CONSUME = new AtomicReference<>(NO_CLUSTER_ID);
 
     public MockConsumerInterceptor() {
         INIT_COUNT.incrementAndGet();
@@ -52,6 +57,11 @@ public class MockConsumerInterceptor implements 
ConsumerInterceptor<String, Stri
 
     @Override
     public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, 
String> records) {
+
+        // This will ensure that we get the cluster metadata when onConsume is 
called for the first time
+        // as subsequent compareAndSet operations will fail.
+        CLUSTER_ID_BEFORE_ON_CONSUME.compareAndSet(NO_CLUSTER_ID, 
CLUSTER_META.get());
+
         Map<TopicPartition, List<ConsumerRecord<String, String>>> recordMap = 
new HashMap<>();
         for (TopicPartition tp : records.partitions()) {
             List<ConsumerRecord<String, String>> lst = new ArrayList<>();
@@ -81,5 +91,12 @@ public class MockConsumerInterceptor implements 
ConsumerInterceptor<String, Stri
         INIT_COUNT.set(0);
         CLOSE_COUNT.set(0);
         ON_COMMIT_COUNT.set(0);
+        CLUSTER_META.set(null);
+        CLUSTER_ID_BEFORE_ON_CONSUME.set(NO_CLUSTER_ID);
+    }
+
+    @Override
+    public void onUpdate(ClusterResource clusterResource) {
+        CLUSTER_META.set(clusterResource);
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java 
b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
new file mode 100644
index 0000000..9c7fca5
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.common.ClusterResource;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MockDeserializer implements ClusterResourceListener, 
Deserializer<byte[]> {
+    public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
+    public static final AtomicReference<ClusterResource> CLUSTER_META = new 
AtomicReference<>();
+    public static final ClusterResource NO_CLUSTER_ID = new 
ClusterResource("no_cluster_id");
+    public static final AtomicReference<ClusterResource> 
CLUSTER_ID_BEFORE_DESERIALIZE = new AtomicReference<>(NO_CLUSTER_ID);
+
+    public MockDeserializer() {
+        INIT_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public byte[] deserialize(String topic, byte[] data) {
+        // This will ensure that we get the cluster metadata when deserialize 
is called for the first time
+        // as subsequent compareAndSet operations will fail.
+        CLUSTER_ID_BEFORE_DESERIALIZE.compareAndSet(NO_CLUSTER_ID, 
CLUSTER_META.get());
+        return data;
+    }
+
+    @Override
+    public void close() {
+        CLOSE_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public void onUpdate(ClusterResource clusterResource) {
+        CLUSTER_META.set(clusterResource);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java 
b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
index de9fcd0..eddaa27 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java
@@ -28,7 +28,6 @@ public class MockMetricsReporter implements MetricsReporter {
     public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
 
     public MockMetricsReporter() {
-
     }
 
     @Override
@@ -49,6 +48,5 @@ public class MockMetricsReporter implements MetricsReporter {
 
     @Override
     public void configure(Map<String, ?> configs) {
-
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/ecc1fb10/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java 
b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
index 9e4d0de..9c4721b 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
@@ -20,19 +20,24 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerInterceptor;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.ClusterResourceListener;
+import org.apache.kafka.common.ClusterResource;
 import org.apache.kafka.common.config.ConfigException;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
-
-public class MockProducerInterceptor implements ProducerInterceptor<String, 
String> {
+public class MockProducerInterceptor implements ClusterResourceListener, 
ProducerInterceptor<String, String> {
     public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
     public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ONSEND_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new 
AtomicInteger(0);
+    public static final AtomicReference<ClusterResource> CLUSTER_META = new 
AtomicReference<>();
+    public static final ClusterResource NO_CLUSTER_ID = new 
ClusterResource("no_cluster_id");
+    public static final AtomicReference<ClusterResource> 
CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT = new AtomicReference<>(NO_CLUSTER_ID);
     public static final String APPEND_STRING_PROP = "mock.interceptor.append";
     private String appendStr;
 
@@ -65,6 +70,10 @@ public class MockProducerInterceptor implements 
ProducerInterceptor<String, Stri
 
     @Override
     public void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {
+        // This will ensure that we get the cluster metadata when 
onAcknowledgement is called for the first time
+        // as subsequent compareAndSet operations will fail.
+        CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.compareAndSet(NO_CLUSTER_ID, 
CLUSTER_META.get());
+
         if (exception != null) {
             ON_ERROR_COUNT.incrementAndGet();
             if (metadata != null) {
@@ -86,5 +95,12 @@ public class MockProducerInterceptor implements 
ProducerInterceptor<String, Stri
         ON_SUCCESS_COUNT.set(0);
         ON_ERROR_COUNT.set(0);
         ON_ERROR_WITH_METADATA_COUNT.set(0);
+        CLUSTER_META.set(null);
+        CLUSTER_ID_BEFORE_ON_ACKNOWLEDGEMENT.set(NO_CLUSTER_ID);
+    }
+
+    @Override
+    public void onUpdate(ClusterResource clusterResource) {
+        CLUSTER_META.set(clusterResource);
     }
-}
+}
\ No newline at end of file

Reply via email to