This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 902009e KAFKA-3417: Wrap metric reporter calls in try/catch blocks
(#3635)
902009e is described below
commit 902009ea981075bdad178c96eb9e9a835d9cc52f
Author: Mickael Maison <[email protected]>
AuthorDate: Mon Apr 30 12:34:02 2018 +0100
KAFKA-3417: Wrap metric reporter calls in try/catch blocks (#3635)
Prevent exception thrown by metric reporters to impact request processing
and other reporters.
Co-authored-by: Mickael Maison <[email protected]>
Co-authored-by: Edoardo Comar <[email protected]>
Reviewers: Rajini Sivaram <[email protected]>
---
.../org/apache/kafka/common/metrics/Metrics.java | 27 +++--
.../scala/unit/kafka/server/BaseRequestTest.scala | 12 +++
.../KafkaMetricReporterExceptionHandlingTest.scala | 116 +++++++++++++++++++++
.../scala/unit/kafka/server/RequestQuotaTest.scala | 10 --
4 files changed, 149 insertions(+), 16 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index dee69f5..7a8667c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -524,8 +524,13 @@ public class Metrics implements Closeable {
public synchronized KafkaMetric removeMetric(MetricName metricName) {
KafkaMetric metric = this.metrics.remove(metricName);
if (metric != null) {
- for (MetricsReporter reporter : reporters)
- reporter.metricRemoval(metric);
+ for (MetricsReporter reporter : reporters) {
+ try {
+ reporter.metricRemoval(metric);
+ } catch (Exception e) {
+ log.error("Error when removing metric from " +
reporter.getClass().getName(), e);
+ }
+ }
}
return metric;
}
@@ -552,8 +557,13 @@ public class Metrics implements Closeable {
if (this.metrics.containsKey(metricName))
throw new IllegalArgumentException("A metric named '" + metricName
+ "' already exists, can't register another one.");
this.metrics.put(metricName, metric);
- for (MetricsReporter reporter : reporters)
- reporter.metricChange(metric);
+ for (MetricsReporter reporter : reporters) {
+ try {
+ reporter.metricChange(metric);
+ } catch (Exception e) {
+ log.error("Error when registering metric on " +
reporter.getClass().getName(), e);
+ }
+ }
}
/**
@@ -634,8 +644,13 @@ public class Metrics implements Closeable {
}
}
- for (MetricsReporter reporter : this.reporters)
- reporter.close();
+ for (MetricsReporter reporter : reporters) {
+ try {
+ reporter.close();
+ } catch (Exception e) {
+ log.error("Error when closing " +
reporter.getClass().getName(), e);
+ }
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
index f91afd4..99355bc 100644
--- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
@@ -155,6 +155,18 @@ abstract class BaseRequestTest extends
KafkaServerTestHarness {
}
/**
+ * Sends a request built by the builder, waits for the response and parses
it
+ */
+ def requestResponse(socket: Socket, clientId: String, correlationId: Int,
requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
+ val apiKey = requestBuilder.apiKey
+ val request = requestBuilder.build()
+ val header = new RequestHeader(apiKey, request.version, clientId,
correlationId)
+ val response = requestAndReceive(socket, request.serialize(header).array)
+ val responseBuffer = skipResponseHeader(response)
+ apiKey.parseResponse(request.version, responseBuffer)
+ }
+
+ /**
* Serializes and sends the requestStruct to the given api.
* A ByteBuffer containing the response (without the response header) is
returned.
*/
diff --git
a/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
new file mode 100644
index 0000000..30f3b23
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/server/KafkaMetricReporterExceptionHandlingTest.scala
@@ -0,0 +1,116 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package kafka.server
+
+import java.net.Socket
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.requests.{ListGroupsRequest,ListGroupsResponse}
+import org.apache.kafka.common.metrics.MetricsReporter
+import org.apache.kafka.common.metrics.KafkaMetric
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.protocol.Errors
+
+import org.junit.Assert._
+import org.junit.{Before, Test}
+import org.junit.After
+import java.util.concurrent.atomic.AtomicInteger
+
+/*
+ * this test checks that a reporter that throws an exception will not affect
other reporters
+ * and will not affect the broker's message handling
+ */
+class KafkaMetricReporterExceptionHandlingTest extends BaseRequestTest {
+
+ override def numBrokers: Int = 1
+
+ override def propertyOverrides(properties: Properties): Unit = {
+ properties.put(KafkaConfig.MetricReporterClassesProp,
classOf[KafkaMetricReporterExceptionHandlingTest.BadReporter].getName + "," +
classOf[KafkaMetricReporterExceptionHandlingTest.GoodReporter].getName)
+ }
+
+ @Before
+ override def setUp() {
+ super.setUp()
+
+ // need a quota prop to register a "throttle-time" metrics after server
startup
+ val quotaProps = new Properties()
+ quotaProps.put(DynamicConfig.Client.RequestPercentageOverrideProp, "0.1")
+ adminZkClient.changeClientIdConfig("<default>", quotaProps)
+ }
+
+ @After
+ override def tearDown() {
+ KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.set(0)
+ KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.set(0)
+
+ super.tearDown()
+ }
+
+ @Test
+ def testBothReportersAreInvoked() {
+ val port =
anySocketServer.boundPort(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ val socket = new Socket("localhost", port)
+ socket.setSoTimeout(10000)
+
+ try {
+ TestUtils.retry(10000) {
+ val error = new ListGroupsResponse(requestResponse(socket, "clientId",
0, new ListGroupsRequest.Builder())).error()
+ assertEquals(Errors.NONE, error)
+
assertEquals(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get,
KafkaMetricReporterExceptionHandlingTest.badReporterRegistered.get)
+
assertTrue(KafkaMetricReporterExceptionHandlingTest.goodReporterRegistered.get
> 0)
+ }
+ } finally {
+ socket.close()
+ }
+ }
+}
+
+object KafkaMetricReporterExceptionHandlingTest {
+ var goodReporterRegistered = new AtomicInteger
+ var badReporterRegistered = new AtomicInteger
+
+ class GoodReporter extends MetricsReporter {
+
+ def configure(configs: java.util.Map[String, _]) {
+ }
+
+ def init(metrics: java.util.List[KafkaMetric]) {
+ }
+
+ def metricChange(metric: KafkaMetric) {
+ if (metric.metricName.group == "Request") {
+ goodReporterRegistered.incrementAndGet
+ }
+ }
+
+ def metricRemoval(metric: KafkaMetric) {
+ }
+
+ def close() {
+ }
+ }
+
+ class BadReporter extends GoodReporter {
+
+ override def metricChange(metric: KafkaMetric) {
+ if (metric.metricName.group == "Request") {
+ badReporterRegistered.incrementAndGet
+ throw new RuntimeException(metric.metricName.toString)
+ }
+ }
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ed85415..8a50fca 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -14,7 +14,6 @@
package kafka.server
-import java.net.Socket
import java.nio.ByteBuffer
import java.util.{Collections, LinkedHashMap, Properties}
import java.util.concurrent.{Executors, Future, TimeUnit}
@@ -331,15 +330,6 @@ class RequestQuotaTest extends BaseRequestTest {
}
}
- private def requestResponse(socket: Socket, clientId: String, correlationId:
Int, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Struct = {
- val apiKey = requestBuilder.apiKey
- val request = requestBuilder.build()
- val header = new RequestHeader(apiKey, request.version, clientId,
correlationId)
- val response = requestAndReceive(socket, request.serialize(header).array)
- val responseBuffer = skipResponseHeader(response)
- apiKey.parseResponse(request.version, responseBuffer)
- }
-
case class Client(clientId: String, apiKey: ApiKeys) {
var correlationId: Int = 0
val builder = requestBuilder(apiKey)
--
To stop receiving notification emails like this one, please contact
[email protected].