KAFKA-2440; Use `NetworkClient` instead of `SimpleConsumer` to fetch data from replica
Author: Ismael Juma <[email protected]> Reviewers: Aditya Auradkar <[email protected]>, Jun Rao <[email protected]> Closes #194 from ijuma/kafka-2440-use-network-client-in-fetcher Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/65bf3afe Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/65bf3afe Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/65bf3afe Branch: refs/heads/trunk Commit: 65bf3afe86ef883136fcd6bc857724b25e750bd7 Parents: 6c1957d Author: Ismael Juma <[email protected]> Authored: Fri Sep 11 16:08:00 2015 -0700 Committer: Jun Rao <[email protected]> Committed: Fri Sep 11 16:08:00 2015 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/NetworkClient.java | 3 +- .../clients/consumer/internals/Fetcher.java | 6 +- .../kafka/common/metrics/JmxReporter.java | 26 ++ .../apache/kafka/common/metrics/Metrics.java | 65 +++- .../kafka/common/metrics/MetricsReporter.java | 6 + .../apache/kafka/common/network/Selector.java | 43 ++- .../common/requests/ListOffsetRequest.java | 3 + .../clients/consumer/internals/FetcherTest.java | 10 +- .../common/metrics/FakeMetricsReporter.java | 5 +- .../kafka/common/metrics/MetricsTest.java | 67 ++++- .../apache/kafka/test/MockMetricsReporter.java | 5 +- .../kafka/consumer/ConsumerFetcherThread.scala | 90 ++++-- .../controller/ControllerChannelManager.scala | 6 +- .../kafka/controller/KafkaController.scala | 20 -- .../kafka/server/AbstractFetcherThread.scala | 212 +++++++------ .../main/scala/kafka/server/KafkaServer.scala | 3 +- .../kafka/server/ReplicaFetcherManager.scala | 9 +- .../kafka/server/ReplicaFetcherThread.scala | 171 +++++++++-- .../scala/kafka/server/ReplicaManager.scala | 8 +- .../integration/BaseTopicMetadataTest.scala | 295 +++++++++++++++++++ .../PlaintextTopicMetadataTest.scala | 23 ++ .../integration/SslTopicMetadataTest.scala | 24 ++ .../kafka/integration/TopicMetadataTest.scala | 291 ------------------ .../kafka/server/BaseReplicaFetchTest.scala | 84 ++++++ .../server/HighwatermarkPersistenceTest.scala | 8 +- .../unit/kafka/server/ISRExpirationTest.scala | 8 +- .../server/PlaintextReplicaFetchTest.scala | 22 ++ .../unit/kafka/server/ReplicaFetchTest.scala | 79 ----- .../unit/kafka/server/ReplicaManagerTest.scala | 14 +- .../unit/kafka/server/SimpleFetchTest.scala | 7 +- .../unit/kafka/server/SslReplicaFetchTest.scala | 24 ++ 31 files changed, 1054 insertions(+), 583 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 1302f35..51a6c5d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -153,7 +153,8 @@ public class NetworkClient implements KafkaClient { @Override public void close(String nodeId) { selector.close(nodeId); - inFlightRequests.clearAll(nodeId); + for (ClientRequest request : inFlightRequests.clearAll(nodeId)) + metadataUpdater.maybeHandleDisconnection(request); connectionStates.remove(nodeId); } http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index a797c79..0efd34d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -61,8 +61,6 @@ import java.util.Set; * This class manage the fetching process with the brokers. */ public class Fetcher<K, V> { - public static final long EARLIEST_OFFSET_TIMESTAMP = -2L; - public static final long LATEST_OFFSET_TIMESTAMP = -1L; private static final Logger log = LoggerFactory.getLogger(Fetcher.class); @@ -220,9 +218,9 @@ public class Fetcher<K, V> { OffsetResetStrategy strategy = subscriptions.resetStrategy(partition); final long timestamp; if (strategy == OffsetResetStrategy.EARLIEST) - timestamp = EARLIEST_OFFSET_TIMESTAMP; + timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP; else if (strategy == OffsetResetStrategy.LATEST) - timestamp = LATEST_OFFSET_TIMESTAMP; + timestamp = ListOffsetRequest.LATEST_TIMESTAMP; else throw new NoOffsetForPartitionException("No offset is set and no reset policy is defined"); http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java index 6b9590c..6872049 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java @@ -78,6 +78,28 @@ public class JmxReporter implements MetricsReporter { } } + @Override + public void metricRemoval(KafkaMetric metric) { + synchronized (LOCK) { + KafkaMbean mbean = removeAttribute(metric); + if (mbean != null) { + if (mbean.metrics.isEmpty()) + unregister(mbean); + else + reregister(mbean); + } + } + } + + private KafkaMbean removeAttribute(KafkaMetric metric) { + MetricName metricName = metric.metricName(); + String mBeanName = getMBeanName(metricName); + KafkaMbean mbean = this.mbeans.get(mBeanName); + if (mbean != null) + mbean.removeAttribute(metricName.name()); + return mbean; + } + private KafkaMbean addAttribute(KafkaMetric metric) { try { MetricName metricName = metric.metricName(); @@ -176,6 +198,10 @@ public class JmxReporter implements MetricsReporter { } } + public KafkaMetric removeAttribute(String name) { + return this.metrics.remove(name); + } + @Override public MBeanInfo getMBeanInfo() { MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()]; http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java index 5f6caf9..42936a1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java @@ -14,6 +14,7 @@ package org.apache.kafka.common.metrics; import java.io.Closeable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; @@ -52,6 +53,7 @@ public class Metrics implements Closeable { private final MetricConfig config; private final ConcurrentMap<MetricName, KafkaMetric> metrics; private final ConcurrentMap<String, Sensor> sensors; + private final ConcurrentMap<Sensor, List<Sensor>> childrenSensors; private final List<MetricsReporter> reporters; private final Time time; @@ -86,8 +88,9 @@ public class Metrics implements Closeable { */ public Metrics(MetricConfig defaultConfig, List<MetricsReporter> reporters, Time time) { this.config = defaultConfig; - this.sensors = new CopyOnWriteMap<String, Sensor>(); - this.metrics = new CopyOnWriteMap<MetricName, KafkaMetric>(); + this.sensors = new CopyOnWriteMap<>(); + this.metrics = new CopyOnWriteMap<>(); + this.childrenSensors = new CopyOnWriteMap<>(); this.reporters = Utils.notNull(reporters); this.time = time; for (MetricsReporter reporter : reporters) @@ -136,11 +139,46 @@ public class Metrics implements Closeable { if (s == null) { s = new Sensor(this, name, parents, config == null ? this.config : config, time); this.sensors.put(name, s); + if (parents != null) { + for (Sensor parent : parents) { + List<Sensor> children = childrenSensors.get(parent.name()); + if (children == null) { + children = new ArrayList<>(); + childrenSensors.put(parent, children); + } + children.add(s); + } + } } return s; } /** + * Remove a sensor (if it exists), associated metrics and its children. + * + * @param name The name of the sensor to be removed + */ + public void removeSensor(String name) { + Sensor sensor = sensors.get(name); + if (sensor != null) { + List<Sensor> childSensors = null; + synchronized (sensor) { + synchronized (this) { + if (sensors.remove(name, sensor)) { + for (KafkaMetric metric : sensor.metrics()) + removeMetric(metric.metricName()); + childSensors = childrenSensors.remove(sensor); + } + } + } + if (childSensors != null) { + for (Sensor childSensor : childSensors) + removeSensor(childSensor.name()); + } + } + } + + /** * Add a metric to monitor an object that implements measurable. This metric won't be associated with any sensor. * This is a way to expose existing values as metrics. * @param metricName The name of the metric @@ -167,10 +205,26 @@ public class Metrics implements Closeable { } /** + * Remove a metric if it exists and return it. Return null otherwise. If a metric is removed, `metricRemoval` + * will be invoked for each reporter. + * + * @param metricName The name of the metric + * @return the removed `KafkaMetric` or null if no such metric exists + */ + public synchronized KafkaMetric removeMetric(MetricName metricName) { + KafkaMetric metric = this.metrics.remove(metricName); + if (metric != null) { + for (MetricsReporter reporter : reporters) + reporter.metricRemoval(metric); + } + return metric; + } + + /** * Add a MetricReporter */ public synchronized void addReporter(MetricsReporter reporter) { - Utils.notNull(reporter).init(new ArrayList<KafkaMetric>(metrics.values())); + Utils.notNull(reporter).init(new ArrayList<>(metrics.values())); this.reporters.add(reporter); } @@ -190,6 +244,11 @@ public class Metrics implements Closeable { return this.metrics; } + /* For testing use only. */ + Map<Sensor, List<Sensor>> childrenSensors() { + return Collections.unmodifiableMap(childrenSensors); + } + /** * Close this metrics repository. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java index 7acc19e..e2a1d80 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java @@ -34,6 +34,12 @@ public interface MetricsReporter extends Configurable { public void metricChange(KafkaMetric metric); /** + * This is called whenever a metric is removed + * @param metric + */ + public void metricRemoval(KafkaMetric metric); + + /** * Called when the metrics repository is closed. */ public void close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 4aa5cbb..e1c6866 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -202,8 +202,8 @@ public class Selector implements Selectable { */ @Override public void close() { - List<String> connections = new LinkedList<String>(channels.keySet()); - for (String id: connections) + List<String> connections = new ArrayList<>(channels.keySet()); + for (String id : connections) close(id); try { this.nioSelector.close(); @@ -212,6 +212,7 @@ public class Selector implements Selectable { } catch (SecurityException se) { log.error("Exception closing nioSelector:", se); } + sensors.close(); } /** @@ -558,6 +559,10 @@ public class Selector implements Selectable { public final Sensor selectTime; public final Sensor ioTime; + /* Names of metrics that are not registered through sensors */ + private final List<MetricName> topLevelMetricNames = new ArrayList<>(); + private final List<Sensor> sensors = new ArrayList<>(); + public SelectorMetrics(Metrics metrics) { this.metrics = metrics; String metricGrpName = metricGrpPrefix + "-metrics"; @@ -569,19 +574,19 @@ public class Selector implements Selectable { tagsSuffix.append(tag.getValue()); } - this.connectionClosed = this.metrics.sensor("connections-closed:" + tagsSuffix.toString()); + this.connectionClosed = sensor("connections-closed:" + tagsSuffix.toString()); MetricName metricName = new MetricName("connection-close-rate", metricGrpName, "Connections closed per second in the window.", metricTags); this.connectionClosed.add(metricName, new Rate()); - this.connectionCreated = this.metrics.sensor("connections-created:" + tagsSuffix.toString()); + this.connectionCreated = sensor("connections-created:" + tagsSuffix.toString()); metricName = new MetricName("connection-creation-rate", metricGrpName, "New connections established per second in the window.", metricTags); this.connectionCreated.add(metricName, new Rate()); - this.bytesTransferred = this.metrics.sensor("bytes-sent-received:" + tagsSuffix.toString()); + this.bytesTransferred = sensor("bytes-sent-received:" + tagsSuffix.toString()); metricName = new MetricName("network-io-rate", metricGrpName, "The average number of network operations (reads or writes) on all connections per second.", metricTags); bytesTransferred.add(metricName, new Rate(new Count())); - this.bytesSent = this.metrics.sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred); + this.bytesSent = sensor("bytes-sent:" + tagsSuffix.toString(), bytesTransferred); metricName = new MetricName("outgoing-byte-rate", metricGrpName, "The average number of outgoing bytes sent per second to all servers.", metricTags); this.bytesSent.add(metricName, new Rate()); metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", metricTags); @@ -591,13 +596,13 @@ public class Selector implements Selectable { metricName = new MetricName("request-size-max", metricGrpName, "The maximum size of any request sent in the window.", metricTags); this.bytesSent.add(metricName, new Max()); - this.bytesReceived = this.metrics.sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred); + this.bytesReceived = sensor("bytes-received:" + tagsSuffix.toString(), bytesTransferred); metricName = new MetricName("incoming-byte-rate", metricGrpName, "Bytes/second read off all sockets", metricTags); this.bytesReceived.add(metricName, new Rate()); metricName = new MetricName("response-rate", metricGrpName, "Responses received sent per second.", metricTags); this.bytesReceived.add(metricName, new Rate(new Count())); - this.selectTime = this.metrics.sensor("select-time:" + tagsSuffix.toString()); + this.selectTime = sensor("select-time:" + tagsSuffix.toString()); metricName = new MetricName("select-rate", metricGrpName, "Number of times the I/O layer checked for new I/O to perform per second", metricTags); this.selectTime.add(metricName, new Rate(new Count())); metricName = new MetricName("io-wait-time-ns-avg", metricGrpName, "The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.", metricTags); @@ -605,13 +610,14 @@ public class Selector implements Selectable { metricName = new MetricName("io-wait-ratio", metricGrpName, "The fraction of time the I/O thread spent waiting.", metricTags); this.selectTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); - this.ioTime = this.metrics.sensor("io-time:" + tagsSuffix.toString()); + this.ioTime = sensor("io-time:" + tagsSuffix.toString()); metricName = new MetricName("io-time-ns-avg", metricGrpName, "The average length of time for I/O per select call in nanoseconds.", metricTags); this.ioTime.add(metricName, new Avg()); metricName = new MetricName("io-ratio", metricGrpName, "The fraction of time the I/O thread spent doing I/O", metricTags); this.ioTime.add(metricName, new Rate(TimeUnit.NANOSECONDS)); metricName = new MetricName("connection-count", metricGrpName, "The current number of active connections.", metricTags); + topLevelMetricNames.add(metricName); this.metrics.addMetric(metricName, new Measurable() { public double measure(MetricConfig config, long now) { return channels.size(); @@ -619,6 +625,12 @@ public class Selector implements Selectable { }); } + private Sensor sensor(String name, Sensor... parents) { + Sensor sensor = metrics.sensor(name, parents); + sensors.add(sensor); + return sensor; + } + public void maybeRegisterConnectionMetrics(String connectionId) { if (!connectionId.isEmpty() && metricsPerConnection) { // if one sensor of the metrics has been registered for the connection, @@ -631,7 +643,7 @@ public class Selector implements Selectable { Map<String, String> tags = new LinkedHashMap<String, String>(metricTags); tags.put("node-id", "node-" + connectionId); - nodeRequest = this.metrics.sensor(nodeRequestName); + nodeRequest = sensor(nodeRequestName); MetricName metricName = new MetricName("outgoing-byte-rate", metricGrpName, tags); nodeRequest.add(metricName, new Rate()); metricName = new MetricName("request-rate", metricGrpName, "The average number of requests sent per second.", tags); @@ -642,14 +654,14 @@ public class Selector implements Selectable { nodeRequest.add(metricName, new Max()); String nodeResponseName = "node-" + connectionId + ".bytes-received"; - Sensor nodeResponse = this.metrics.sensor(nodeResponseName); + Sensor nodeResponse = sensor(nodeResponseName); metricName = new MetricName("incoming-byte-rate", metricGrpName, tags); nodeResponse.add(metricName, new Rate()); metricName = new MetricName("response-rate", metricGrpName, "The average number of responses received per second.", tags); nodeResponse.add(metricName, new Rate(new Count())); String nodeTimeName = "node-" + connectionId + ".latency"; - Sensor nodeRequestTime = this.metrics.sensor(nodeTimeName); + Sensor nodeRequestTime = sensor(nodeTimeName); metricName = new MetricName("request-latency-avg", metricGrpName, tags); nodeRequestTime.add(metricName, new Avg()); metricName = new MetricName("request-latency-max", metricGrpName, tags); @@ -679,6 +691,13 @@ public class Selector implements Selectable { nodeRequest.record(bytes, now); } } + + public void close() { + for (MetricName metricName : topLevelMetricNames) + metrics.removeMetric(metricName); + for (Sensor sensor : sensors) + metrics.removeSensor(sensor.name()); + } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java index 6da4a0e..8dfd811 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java @@ -31,6 +31,9 @@ import java.util.List; import java.util.Map; public class ListOffsetRequest extends AbstractRequest { + + public static final long EARLIEST_TIMESTAMP = -2L; + public static final long LATEST_TIMESTAMP = -1L; private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.LIST_OFFSETS.id); private static final String REPLICA_ID_KEY_NAME = "replica_id"; http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index f2a8381..d79a10e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -239,7 +239,7 @@ public class FetcherTest { subscriptions.assign(Arrays.asList(tp)); // with no commit position, we should reset using the default strategy defined above (EARLIEST) - client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP), + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); @@ -253,7 +253,7 @@ public class FetcherTest { subscriptions.assign(Arrays.asList(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); - client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP), + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); @@ -267,7 +267,7 @@ public class FetcherTest { subscriptions.assign(Arrays.asList(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); - client.prepareResponse(listOffsetRequestMatcher(Fetcher.EARLIEST_OFFSET_TIMESTAMP), + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); @@ -282,11 +282,11 @@ public class FetcherTest { subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); // First request gets a disconnect - client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP), + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L)), true); // Next one succeeds - client.prepareResponse(listOffsetRequestMatcher(Fetcher.LATEST_OFFSET_TIMESTAMP), + client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); fetcher.updateFetchPositions(Collections.singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java index 7c7ead1..d5dd9b8 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java @@ -27,6 +27,9 @@ public class FakeMetricsReporter implements MetricsReporter { public void metricChange(KafkaMetric metric) {} @Override + public void metricRemoval(KafkaMetric metric) {} + + @Override public void close() {} -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index 0a7dcd8..9096ef7 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -13,6 +13,8 @@ package org.apache.kafka.common.metrics; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; import java.util.Arrays; @@ -134,14 +136,14 @@ public class MetricsTest { /* each metric should have a count equal to one + its children's count */ assertEquals(1.0, gc, EPS); - assertEquals(1.0 + gc, child1.metrics().get(0).value(), EPS); + assertEquals(1.0 + gc, c1, EPS); assertEquals(1.0, c2, EPS); assertEquals(1.0 + c1, p2, EPS); assertEquals(1.0 + c1 + c2, p1, EPS); } @Test(expected = IllegalArgumentException.class) - public void testBadSensorHiearchy() { + public void testBadSensorHierarchy() { Sensor p = metrics.sensor("parent"); Sensor c1 = metrics.sensor("child1", p); Sensor c2 = metrics.sensor("child2", p); @@ -149,6 +151,67 @@ public class MetricsTest { } @Test + public void testRemoveSensor() { + Sensor parent1 = metrics.sensor("test.parent1"); + parent1.add(new MetricName("test.parent1.count", "grp1"), new Count()); + Sensor parent2 = metrics.sensor("test.parent2"); + parent2.add(new MetricName("test.parent2.count", "grp1"), new Count()); + Sensor child1 = metrics.sensor("test.child1", parent1, parent2); + child1.add(new MetricName("test.child1.count", "grp1"), new Count()); + Sensor child2 = metrics.sensor("test.child2", parent2); + child2.add(new MetricName("test.child2.count", "grp1"), new Count()); + Sensor grandChild1 = metrics.sensor("test.gchild2", child2); + grandChild1.add(new MetricName("test.gchild2.count", "grp1"), new Count()); + + Sensor sensor = metrics.getSensor("test.parent1"); + assertNotNull(sensor); + metrics.removeSensor("test.parent1"); + assertNull(metrics.getSensor("test.parent1")); + assertNull(metrics.metrics().get(new MetricName("test.parent1.count", "grp1"))); + assertNull(metrics.getSensor("test.child1")); + assertNull(metrics.childrenSensors().get(sensor)); + assertNull(metrics.metrics().get(new MetricName("test.child1.count", "grp1"))); + + sensor = metrics.getSensor("test.gchild2"); + assertNotNull(sensor); + metrics.removeSensor("test.gchild2"); + assertNull(metrics.getSensor("test.gchild2")); + assertNull(metrics.childrenSensors().get(sensor)); + assertNull(metrics.metrics().get(new MetricName("test.gchild2.count", "grp1"))); + + sensor = metrics.getSensor("test.child2"); + assertNotNull(sensor); + metrics.removeSensor("test.child2"); + assertNull(metrics.getSensor("test.child2")); + assertNull(metrics.childrenSensors().get(sensor)); + assertNull(metrics.metrics().get(new MetricName("test.child2.count", "grp1"))); + + sensor = metrics.getSensor("test.parent2"); + assertNotNull(sensor); + metrics.removeSensor("test.parent2"); + assertNull(metrics.getSensor("test.parent2")); + assertNull(metrics.childrenSensors().get(sensor)); + assertNull(metrics.metrics().get(new MetricName("test.parent2.count", "grp1"))); + + assertEquals(0, metrics.metrics().size()); + } + + @Test + public void testRemoveMetric() { + metrics.addMetric(new MetricName("test1", "grp1"), new Count()); + metrics.addMetric(new MetricName("test2", "grp1"), new Count()); + + assertNotNull(metrics.removeMetric(new MetricName("test1", "grp1"))); + assertNull(metrics.metrics().get(new MetricName("test1", "grp1"))); + assertNotNull(metrics.metrics().get(new MetricName("test2", "grp1"))); + + assertNotNull(metrics.removeMetric(new MetricName("test2", "grp1"))); + assertNull(metrics.metrics().get(new MetricName("test2", "grp1"))); + + assertEquals(0, metrics.metrics().size()); + } + + @Test public void testEventWindowing() { Count count = new Count(); MetricConfig config = new MetricConfig().eventWindow(1).samples(2); http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java index 2a8fa1f..de9fcd0 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java +++ b/clients/src/test/java/org/apache/kafka/test/MockMetricsReporter.java @@ -37,9 +37,10 @@ public class MockMetricsReporter implements MetricsReporter { } @Override - public void metricChange(KafkaMetric metric) { + public void metricChange(KafkaMetric metric) {} - } + @Override + public void metricRemoval(KafkaMetric metric) {} @Override public void close() { http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala index 33ea728..8801ff8 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala @@ -17,12 +17,14 @@ package kafka.consumer +import kafka.api.{OffsetRequest, Request, FetchRequestBuilder, FetchResponsePartitionData} import kafka.cluster.BrokerEndPoint -import kafka.server.AbstractFetcherThread import kafka.message.ByteBufferMessageSet -import kafka.api.{Request, OffsetRequest, FetchResponsePartitionData} -import kafka.common.TopicAndPartition - +import kafka.server.{PartitionFetchState, AbstractFetcherThread} +import kafka.common.{ErrorMapping, TopicAndPartition} +import scala.collection.JavaConverters +import JavaConverters._ +import ConsumerFetcherThread._ class ConsumerFetcherThread(name: String, val config: ConsumerConfig, @@ -32,31 +34,52 @@ class ConsumerFetcherThread(name: String, extends AbstractFetcherThread(name = name, clientId = config.clientId, sourceBroker = sourceBroker, - socketTimeout = config.socketTimeoutMs, - socketBufferSize = config.socketReceiveBufferBytes, - fetchSize = config.fetchMessageMaxBytes, - fetcherBrokerId = Request.OrdinaryConsumerId, - maxWait = config.fetchWaitMaxMs, - minBytes = config.fetchMinBytes, fetchBackOffMs = config.refreshLeaderBackoffMs, isInterruptible = true) { + type REQ = FetchRequest + type PD = PartitionData + + private val clientId = config.clientId + private val fetchSize = config.fetchMessageMaxBytes + + private val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, config.socketTimeoutMs, + config.socketReceiveBufferBytes, config.clientId) + + private val fetchRequestBuilder = new FetchRequestBuilder(). + clientId(clientId). + replicaId(Request.OrdinaryConsumerId). + maxWait(config.fetchWaitMaxMs). + minBytes(config.fetchMinBytes). + requestVersion(kafka.api.FetchRequest.CurrentVersion) + + override def initiateShutdown(): Boolean = { + val justShutdown = super.initiateShutdown() + if (justShutdown && isInterruptible) + simpleConsumer.disconnectToHandleJavaIOBug() + justShutdown + } + + override def shutdown(): Unit = { + super.shutdown() + simpleConsumer.close() + } + // process fetched data - def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { + def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) { val pti = partitionMap(topicAndPartition) if (pti.getFetchOffset != fetchOffset) throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) - pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) + pti.enqueue(partitionData.underlying.messages.asInstanceOf[ByteBufferMessageSet]) } // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = { - var startTimestamp : Long = 0 - config.autoOffsetReset match { - case OffsetRequest.SmallestTimeString => startTimestamp = OffsetRequest.EarliestTime - case OffsetRequest.LargestTimeString => startTimestamp = OffsetRequest.LatestTime - case _ => startTimestamp = OffsetRequest.LatestTime + val startTimestamp = config.autoOffsetReset match { + case OffsetRequest.SmallestTimeString => OffsetRequest.EarliestTime + case OffsetRequest.LargestTimeString => OffsetRequest.LatestTime + case _ => OffsetRequest.LatestTime } val newOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, startTimestamp, Request.OrdinaryConsumerId) val pti = partitionMap(topicAndPartition) @@ -70,4 +93,37 @@ class ConsumerFetcherThread(name: String, removePartitions(partitions.toSet) consumerFetcherManager.addPartitionsWithError(partitions) } + + protected def buildFetchRequest(partitionMap: collection.Map[TopicAndPartition, PartitionFetchState]): FetchRequest = { + partitionMap.foreach { case ((topicAndPartition, partitionFetchState)) => + if (partitionFetchState.isActive) + fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, partitionFetchState.offset, + fetchSize) + } + + new FetchRequest(fetchRequestBuilder.build()) + } + + protected def fetch(fetchRequest: FetchRequest): collection.Map[TopicAndPartition, PartitionData] = + simpleConsumer.fetch(fetchRequest.underlying).data.map { case (key, value) => + key -> new PartitionData(value) + } + +} + +object ConsumerFetcherThread { + + class FetchRequest(val underlying: kafka.api.FetchRequest) extends AbstractFetcherThread.FetchRequest { + def isEmpty: Boolean = underlying.requestInfo.isEmpty + def offset(topicAndPartition: TopicAndPartition): Long = underlying.requestInfo(topicAndPartition).offset + } + + class PartitionData(val underlying: FetchResponsePartitionData) extends AbstractFetcherThread.PartitionData { + def errorCode: Short = underlying.error + def toByteBufferMessageSet: ByteBufferMessageSet = underlying.messages.asInstanceOf[ByteBufferMessageSet] + def highWatermark: Long = underlying.hw + def exception: Option[Throwable] = + if (errorCode == ErrorMapping.NoError) None else Some(ErrorMapping.exceptionFor(errorCode)) + + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index da1cff0..b1cf668 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -88,7 +88,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id)) val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerSecurityProtocol) val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port) - val networkClient = controllerContext.networkClientMap.getOrElseUpdate(broker.id, { + val networkClient = { val selector = new Selector( NetworkReceive.UNLIMITED, config.connectionsMaxIdleMs, @@ -108,7 +108,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE ) - }) + } val requestThread = new RequestSendThread(config.brokerId, controllerContext, broker, messageQueue, networkClient, brokerNode, config, time) requestThread.setDaemon(false) brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, broker, messageQueue, requestThread)) @@ -116,7 +116,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) { try { - brokerState.networkClient.close(brokerState.brokerNode.idString) + brokerState.networkClient.close() brokerState.messageQueue.clear() brokerState.requestSendThread.shutdown() brokerStateInfo.remove(brokerState.broker.id) http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 2d0845d..284fa23 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -18,7 +18,6 @@ package kafka.controller import java.util -import org.apache.kafka.clients.NetworkClient import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.requests.{AbstractRequest, AbstractRequestResponse} @@ -58,19 +57,6 @@ class ControllerContext(val zkClient: ZkClient, val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet - /** - * This map is used to ensure the following invariant: at most one `NetworkClient`/`Selector` instance should be - * created per broker during the lifetime of the `metrics` parameter received by `KafkaController` (which has the same - * lifetime as `KafkaController` since they are both shut down during `KafkaServer.shutdown()`). - * - * If we break this invariant, an exception is thrown during the instantiation of `Selector` due to the usage of - * two equal `MetricName` instances for two `Selector` instantiations. This way also helps to maintain the metrics sane. - * - * In the future, we should consider redesigning `ControllerChannelManager` so that we can use a single - * `NetworkClient`/`Selector` for multiple broker connections as that is the intended usage and it may help simplify this code. - */ - private[controller] val networkClientMap = mutable.Map[Int, NetworkClient]() - private var liveBrokersUnderlying: Set[Broker] = Set.empty private var liveBrokerIdsUnderlying: Set[Int] = Set.empty @@ -135,11 +121,6 @@ class ControllerContext(val zkClient: ZkClient, allTopics -= topic } - private[controller] def closeNetworkClients(): Unit = { - networkClientMap.values.foreach(_.close()) - networkClientMap.clear() - } - } @@ -711,7 +692,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt isRunning = false } onControllerResignation() - controllerContext.closeNetworkClients() } def sendRequest(brokerId: Int, apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest, callback: AbstractRequestResponse => Unit = null) = { http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/AbstractFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala index dca975c..21c7e3e 100755 --- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala +++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala @@ -17,19 +17,19 @@ package kafka.server +import java.util.concurrent.locks.ReentrantLock + import kafka.cluster.BrokerEndPoint -import kafka.utils.{Pool, ShutdownableThread} -import kafka.consumer.{PartitionTopicInfo, SimpleConsumer} -import kafka.api.{FetchRequest, FetchResponse, FetchResponsePartitionData, FetchRequestBuilder} -import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition, ErrorMapping} -import kafka.utils.DelayedItem -import kafka.utils.CoreUtils.inLock -import kafka.message.{InvalidMessageException, ByteBufferMessageSet, MessageAndOffset} +import kafka.consumer.PartitionTopicInfo +import kafka.message.{InvalidMessageException, MessageAndOffset, ByteBufferMessageSet} +import kafka.utils.{Pool, ShutdownableThread, DelayedItem} +import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition} import kafka.metrics.KafkaMetricsGroup - +import kafka.utils.CoreUtils.inLock +import org.apache.kafka.common.protocol.Errors +import AbstractFetcherThread._ import scala.collection.{mutable, Set, Map} import java.util.concurrent.TimeUnit -import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicLong import com.yammer.metrics.core.Gauge @@ -40,35 +40,25 @@ import com.yammer.metrics.core.Gauge abstract class AbstractFetcherThread(name: String, clientId: String, sourceBroker: BrokerEndPoint, - socketTimeout: Int, - socketBufferSize: Int, - fetchSize: Int, - fetcherBrokerId: Int = -1, - maxWait: Int = 0, - minBytes: Int = 1, fetchBackOffMs: Int = 0, - fetchRequestVersion: Short = FetchRequest.CurrentVersion, isInterruptible: Boolean = true) extends ShutdownableThread(name, isInterruptible) { + + type REQ <: FetchRequest + type PD <: PartitionData + private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map private val partitionMapLock = new ReentrantLock private val partitionMapCond = partitionMapLock.newCondition() - val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize, clientId) + private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port) val fetcherStats = new FetcherStats(metricId) val fetcherLagStats = new FetcherLagStats(metricId) - val fetchRequestBuilder = new FetchRequestBuilder(). - clientId(clientId). - replicaId(fetcherBrokerId). - maxWait(maxWait). - minBytes(minBytes). - requestVersion(fetchRequestVersion) /* callbacks to be defined in subclass */ // process fetched data - def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, - partitionData: FetchResponsePartitionData) + def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD) // handle a partition whose offset is out of range and return a new fetch offset def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long @@ -76,45 +66,40 @@ abstract class AbstractFetcherThread(name: String, // deal with partitions with errors, potentially due to leadership changes def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) + protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ + + protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD] + override def shutdown(){ - val justShutdown = initiateShutdown() - if (justShutdown && isInterruptible) - simpleConsumer.disconnectToHandleJavaIOBug() + initiateShutdown() inLock(partitionMapLock) { partitionMapCond.signalAll() } awaitShutdown() - simpleConsumer.close() } override def doWork() { - var fetchRequest: FetchRequest = null - - inLock(partitionMapLock) { - partitionMap.foreach { - case((topicAndPartition, partitionFetchState)) => - if(partitionFetchState.isActive) - fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, - partitionFetchState.offset, fetchSize) - } - fetchRequest = fetchRequestBuilder.build() - if (fetchRequest.requestInfo.isEmpty) { + val fetchRequest = inLock(partitionMapLock) { + val fetchRequest = buildFetchRequest(partitionMap) + if (fetchRequest.isEmpty) { trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) } + fetchRequest } - if(!fetchRequest.requestInfo.isEmpty) + if (!fetchRequest.isEmpty) processFetchRequest(fetchRequest) } - private def processFetchRequest(fetchRequest: FetchRequest) { + private def processFetchRequest(fetchRequest: REQ) { val partitionsWithError = new mutable.HashSet[TopicAndPartition] - var response: FetchResponse = null + var responseData: Map[TopicAndPartition, PD] = Map.empty + try { trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) - response = simpleConsumer.fetch(fetchRequest) + responseData = fetch(fetchRequest) } catch { case t: Throwable => if (isRunning.get) { @@ -128,64 +113,64 @@ abstract class AbstractFetcherThread(name: String, } fetcherStats.requestRate.mark() - if (response != null) { + if (responseData.nonEmpty) { // process fetched data inLock(partitionMapLock) { - response.data.foreach { - case(topicAndPartition, partitionData) => - val (topic, partitionId) = topicAndPartition.asTuple - partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState => - // we append to the log if the current offset is defined and it is the same as the offset requested during fetch - if (fetchRequest.requestInfo(topicAndPartition).offset == currentPartitionFetchState.offset) { - partitionData.error match { - case ErrorMapping.NoError => - try { - val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] - val validBytes = messages.validBytes - val newOffset = messages.shallowIterator.toSeq.lastOption match { - case Some(m: MessageAndOffset) => m.nextOffset - case None => currentPartitionFetchState.offset - } - partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) - fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset - fetcherStats.byteRate.mark(validBytes) - // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread - processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData) - } catch { - case ime: InvalidMessageException => - // we log the error and continue. This ensures two things - // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag - // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and - // should get fixed in the subsequent fetches - logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage) - case e: Throwable => - throw new KafkaException("error processing data for partition [%s,%d] offset %d" - .format(topic, partitionId, currentPartitionFetchState.offset), e) - } - case ErrorMapping.OffsetOutOfRangeCode => - try { - val newOffset = handleOffsetOutOfRange(topicAndPartition) - partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) - error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" - .format(currentPartitionFetchState.offset, topic, partitionId, newOffset)) - } catch { - case e: Throwable => - error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) - partitionsWithError += topicAndPartition + + responseData.foreach { case (topicAndPartition, partitionData) => + val TopicAndPartition(topic, partitionId) = topicAndPartition + partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState => + // we append to the log if the current offset is defined and it is the same as the offset requested during fetch + if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) { + Errors.forCode(partitionData.errorCode) match { + case Errors.NONE => + try { + val messages = partitionData.toByteBufferMessageSet + val validBytes = messages.validBytes + val newOffset = messages.shallowIterator.toSeq.lastOption match { + case Some(m: MessageAndOffset) => m.nextOffset + case None => currentPartitionFetchState.offset } - case _ => - if (isRunning.get) { - error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, - ErrorMapping.exceptionFor(partitionData.error).getClass)) + partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.highWatermark - newOffset + fetcherStats.byteRate.mark(validBytes) + // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread + processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData) + } catch { + case ime: InvalidMessageException => + // we log the error and continue. This ensures two things + // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag + // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and + // should get fixed in the subsequent fetches + logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage) + case e: Throwable => + throw new KafkaException("error processing data for partition [%s,%d] offset %d" + .format(topic, partitionId, currentPartitionFetchState.offset), e) + } + case Errors.OFFSET_OUT_OF_RANGE => + try { + val newOffset = handleOffsetOutOfRange(topicAndPartition) + partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) + error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" + .format(currentPartitionFetchState.offset, topic, partitionId, newOffset)) + } catch { + case e: Throwable => + error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) partitionsWithError += topicAndPartition - } - } + } + case _ => + if (isRunning.get) { + error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, + partitionData.exception.get)) + partitionsWithError += topicAndPartition + } + } }) } } } - if(partitionsWithError.size > 0) { + if (partitionsWithError.nonEmpty) { debug("handling partitions with error for %s".format(partitionsWithError)) handlePartitionsWithErrors(partitionsWithError) } @@ -203,9 +188,7 @@ abstract class AbstractFetcherThread(name: String, else new PartitionFetchState(offset) )} partitionMapCond.signalAll() - } finally { - partitionMapLock.unlock() - } + } finally partitionMapLock.unlock() } def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) { @@ -213,33 +196,42 @@ abstract class AbstractFetcherThread(name: String, try { for (partition <- partitions) { partitionMap.get(partition).foreach (currentPartitionFetchState => - if(currentPartitionFetchState.isActive) + if (currentPartitionFetchState.isActive) partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay))) ) } partitionMapCond.signalAll() - } finally { - partitionMapLock.unlock() - } + } finally partitionMapLock.unlock() } def removePartitions(topicAndPartitions: Set[TopicAndPartition]) { partitionMapLock.lockInterruptibly() - try { - topicAndPartitions.foreach(tp => partitionMap.remove(tp)) - } finally { - partitionMapLock.unlock() - } + try topicAndPartitions.foreach(partitionMap.remove) + finally partitionMapLock.unlock() } def partitionCount() = { partitionMapLock.lockInterruptibly() - try { - partitionMap.size - } finally { - partitionMapLock.unlock() - } + try partitionMap.size + finally partitionMapLock.unlock() } + +} + +object AbstractFetcherThread { + + trait FetchRequest { + def isEmpty: Boolean + def offset(topicAndPartition: TopicAndPartition): Long + } + + trait PartitionData { + def errorCode: Short + def exception: Option[Throwable] + def toByteBufferMessageSet: ByteBufferMessageSet + def highWatermark: Long + } + } class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup { http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 30406ce..f3f1fa6 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -178,7 +178,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg socketServer.startup() /* start replica manager */ - replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler, logManager, isShuttingDown) + replicaManager = new ReplicaManager(config, metrics, time, kafkaMetricsTime, zkClient, kafkaScheduler, logManager, + isShuttingDown) replicaManager.startup() /* start kafka controller */ http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala index ef38ed3..6e845e9 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala @@ -18,13 +18,16 @@ package kafka.server import kafka.cluster.BrokerEndPoint +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.utils.Time -class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager) +class ReplicaFetcherManager(brokerConfig: KafkaConfig, replicaMgr: ReplicaManager, metrics: Metrics, time: Time) extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, "Replica", brokerConfig.numReplicaFetchers) { override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = { - new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr) + new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, + replicaMgr, metrics, time) } def shutdown() { @@ -32,4 +35,4 @@ class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val r closeAllFetchers() info("shutdown completed") } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 711d749..6c85e52 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -17,47 +17,96 @@ package kafka.server +import java.net.SocketTimeoutException + import kafka.admin.AdminUtils import kafka.cluster.BrokerEndPoint import kafka.log.LogConfig import kafka.message.ByteBufferMessageSet -import kafka.api.{KAFKA_083, OffsetRequest, FetchResponsePartitionData} +import kafka.api.KAFKA_083 import kafka.common.{KafkaStorageException, TopicAndPartition} +import ReplicaFetcherThread._ +import org.apache.kafka.clients.{ManualMetadataUpdater, NetworkClient, ClientRequest, ClientResponse} +import org.apache.kafka.common.network.{Selectable, ChannelBuilders, NetworkReceive, Selector} +import org.apache.kafka.common.requests.{ListOffsetResponse, FetchResponse, RequestSend, AbstractRequest, ListOffsetRequest} +import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest} +import org.apache.kafka.common.{Node, TopicPartition} +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.protocol.{Errors, ApiKeys} +import org.apache.kafka.common.security.ssl.SSLFactory +import org.apache.kafka.common.utils.Time + +import scala.collection.{JavaConverters, Map, mutable} +import JavaConverters._ -class ReplicaFetcherThread(name:String, +class ReplicaFetcherThread(name: String, sourceBroker: BrokerEndPoint, brokerConfig: KafkaConfig, - replicaMgr: ReplicaManager) + replicaMgr: ReplicaManager, + metrics: Metrics, + time: Time) extends AbstractFetcherThread(name = name, clientId = name, sourceBroker = sourceBroker, - socketTimeout = brokerConfig.replicaSocketTimeoutMs, - socketBufferSize = brokerConfig.replicaSocketReceiveBufferBytes, - fetchSize = brokerConfig.replicaFetchMaxBytes, - fetcherBrokerId = brokerConfig.brokerId, - maxWait = brokerConfig.replicaFetchWaitMaxMs, - minBytes = brokerConfig.replicaFetchMinBytes, fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, - fetchRequestVersion = - if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0, isInterruptible = false) { + type REQ = FetchRequest + type PD = PartitionData + + private val fetchRequestVersion: Short = if (brokerConfig.interBrokerProtocolVersion.onOrAfter(KAFKA_083)) 1 else 0 + private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs + private val replicaId = brokerConfig.brokerId + private val maxWait = brokerConfig.replicaFetchWaitMaxMs + private val minBytes = brokerConfig.replicaFetchMinBytes + private val fetchSize = brokerConfig.replicaFetchMaxBytes + + private def clientId = name + + private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port) + + private val networkClient = { + val selector = new Selector( + NetworkReceive.UNLIMITED, + brokerConfig.connectionsMaxIdleMs, + metrics, + time, + "replica-fetcher", + Map("broker-id" -> sourceBroker.id.toString).asJava, + false, + ChannelBuilders.create(brokerConfig.interBrokerSecurityProtocol, SSLFactory.Mode.CLIENT, brokerConfig.channelConfigs) + ) + new NetworkClient( + selector, + new ManualMetadataUpdater(), + clientId, + 1, + 0, + Selectable.USE_DEFAULT_BUFFER_SIZE, + brokerConfig.replicaSocketReceiveBufferBytes + ) + } + + override def shutdown(): Unit = { + super.shutdown() + networkClient.close() + } + // process fetched data - def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { + def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PartitionData) { try { - val topic = topicAndPartition.topic - val partitionId = topicAndPartition.partition + val TopicAndPartition(topic, partitionId) = topicAndPartition val replica = replicaMgr.getReplica(topic, partitionId).get - val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet] + val messageSet = partitionData.toByteBufferMessageSet if (fetchOffset != replica.logEndOffset.messageOffset) throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset)) trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" - .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw)) + .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.highWatermark)) replica.log.get.append(messageSet, assignOffsets = false) trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s" .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition)) - val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw) + val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark) // for the follower replica, we do not need to keep // its segment base offset the physical position, // these values will be computed upon making the leader @@ -87,7 +136,9 @@ class ReplicaFetcherThread(name:String, * * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. */ - val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId) + val leaderEndOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.LATEST_TIMESTAMP, + brokerConfig.brokerId) + if (leaderEndOffset < replica.logEndOffset.messageOffset) { // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, @@ -112,7 +163,8 @@ class ReplicaFetcherThread(name:String, * * Roll out a new log at the follower with the start offset equal to the current leader's start offset and continue fetching. */ - val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId) + val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, + brokerConfig.brokerId) warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset) @@ -124,4 +176,85 @@ class ReplicaFetcherThread(name:String, def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) { delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong) } + + protected def fetch(fetchRequest: FetchRequest): Map[TopicAndPartition, PartitionData] = { + val clientResponse = sendRequest(ApiKeys.FETCH, Some(fetchRequestVersion), fetchRequest.underlying) + new FetchResponse(clientResponse.responseBody).responseData.asScala.map { case (key, value) => + TopicAndPartition(key.topic, key.partition) -> new PartitionData(value) + } + } + + private def sendRequest(apiKey: ApiKeys, apiVersion: Option[Short], request: AbstractRequest): ClientResponse = { + import kafka.utils.NetworkClientBlockingOps._ + val header = apiVersion.fold(networkClient.nextRequestHeader(apiKey))(networkClient.nextRequestHeader(apiKey, _)) + try { + if (!networkClient.blockingReady(sourceNode, socketTimeout)(time)) + throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms") + else { + val send = new RequestSend(sourceBroker.id.toString, header, request.toStruct) + val clientRequest = new ClientRequest(time.milliseconds(), true, send, null) + networkClient.blockingSendAndReceive(clientRequest, socketTimeout)(time).getOrElse { + throw new SocketTimeoutException(s"No response received within $socketTimeout ms") + } + } + } + catch { + case e: Throwable => + networkClient.close(sourceBroker.id.toString) + throw e + } + + } + + private def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = { + val topicPartition = new TopicPartition(topicAndPartition.topic, topicAndPartition.partition) + val partitions = Map( + topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1) + ) + val request = new ListOffsetRequest(consumerId, partitions.asJava) + val clientResponse = sendRequest(ApiKeys.LIST_OFFSETS, None, request) + val response = new ListOffsetResponse(clientResponse.responseBody) + val partitionData = response.responseData.get(topicPartition) + Errors.forCode(partitionData.errorCode) match { + case Errors.NONE => partitionData.offsets.asScala.head + case errorCode => throw errorCode.exception + } + } + + protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): FetchRequest = { + val requestMap = mutable.Map.empty[TopicPartition, JFetchRequest.PartitionData] + + partitionMap.foreach { case ((TopicAndPartition(topic, partition), partitionFetchState)) => + if (partitionFetchState.isActive) + requestMap(new TopicPartition(topic, partition)) = new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize) + } + + new FetchRequest(new JFetchRequest(replicaId, maxWait, minBytes, requestMap.asJava)) + } + +} + +object ReplicaFetcherThread { + + private[server] class FetchRequest(val underlying: JFetchRequest) extends AbstractFetcherThread.FetchRequest { + def isEmpty: Boolean = underlying.fetchData.isEmpty + def offset(topicAndPartition: TopicAndPartition): Long = + underlying.fetchData.asScala(new TopicPartition(topicAndPartition.topic, topicAndPartition.partition)).offset + } + + private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData { + + def errorCode: Short = underlying.errorCode + + def toByteBufferMessageSet: ByteBufferMessageSet = new ByteBufferMessageSet(underlying.recordSet) + + def highWatermark: Long = underlying.highWatermark + + def exception: Option[Throwable] = Errors.forCode(errorCode) match { + case Errors.NONE => None + case e => Some(e.exception) + } + + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/65bf3afe/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index c195536..3e287ea 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -30,7 +30,9 @@ import kafka.message.{ByteBufferMessageSet, MessageSet} import kafka.metrics.KafkaMetricsGroup import kafka.utils._ import org.I0Itec.zkclient.ZkClient +import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.utils.{Time => JTime} import scala.collection._ @@ -94,7 +96,9 @@ object ReplicaManager { } class ReplicaManager(val config: KafkaConfig, - private val time: Time, + metrics: Metrics, + time: Time, + jTime: JTime, val zkClient: ZkClient, scheduler: Scheduler, val logManager: LogManager, @@ -104,7 +108,7 @@ class ReplicaManager(val config: KafkaConfig, private val localBrokerId = config.brokerId private val allPartitions = new Pool[(String, Int), Partition] private val replicaStateChangeLock = new Object - val replicaFetcherManager = new ReplicaFetcherManager(config, this) + val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, jTime) private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false) val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap private var hwThreadInitialized = false
