This is an automated email from the ASF dual-hosted git repository. rhauch pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit eb58b61603e8bdb0ccf94af3f956a09b5591d5b3 Author: Chris Egerton <chr...@confluent.io> AuthorDate: Wed Jun 10 20:03:25 2020 -0700 KAFKA-9066: Retain metrics for failed tasks (#8502) Author: Chris Egerton <chr...@confluent.io> Reviewers: Nigel Liang <ni...@nigelliang.com>, Randall Hauch <rha...@gmail.com> --- .../org/apache/kafka/connect/runtime/Worker.java | 7 +++++- .../kafka/connect/runtime/WorkerSinkTask.java | 8 +++++-- .../kafka/connect/runtime/WorkerSourceTask.java | 8 +++++-- .../apache/kafka/connect/runtime/WorkerTask.java | 27 ++++++++-------------- .../kafka/connect/runtime/WorkerTaskTest.java | 9 -------- .../apache/kafka/connect/runtime/WorkerTest.java | 12 ++++++++++ .../runtime/WorkerWithTopicCreationTest.java | 12 ++++++++++ 7 files changed, 52 insertions(+), 31 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 26b0444..67a27e1 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -789,13 +789,18 @@ public class Worker { return; } - connectorStatusMetricsGroup.recordTaskRemoved(taskId); if (!task.awaitStop(timeout)) { log.error("Graceful stop of task {} failed.", task.id()); task.cancel(); } else { log.debug("Graceful stop of task {} succeeded.", task.id()); } + + try { + task.removeMetrics(); + } finally { + connectorStatusMetricsGroup.recordTaskRemoved(taskId); + } } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index c22ce4a..50efffc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -174,8 +174,12 @@ class WorkerSinkTask extends WorkerTask { } @Override - protected void releaseResources() { - sinkTaskMetricsGroup.close(); + public void removeMetrics() { + try { + sinkTaskMetricsGroup.close(); + } finally { + super.removeMetrics(); + } } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index e44d93a..1febd7f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -188,8 +188,12 @@ class WorkerSourceTask extends WorkerTask { } @Override - protected void releaseResources() { - sourceTaskMetricsGroup.close(); + public void removeMetrics() { + try { + sourceTaskMetricsGroup.close(); + } finally { + super.removeMetrics(); + } } @Override diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java index b0a6a0c..11b0746 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java @@ -144,16 +144,17 @@ abstract class WorkerTask implements Runnable { } } + /** + * Remove all metrics published by this task. + */ + public void removeMetrics() { + taskMetricsGroup.close(); + } + protected abstract void execute(); protected abstract void close(); - /** - * Method called when this worker task has been completely closed, and when the subclass should clean up - * all resources. - */ - protected abstract void releaseResources(); - protected boolean isStopping() { return stopping; } @@ -239,17 +240,9 @@ abstract class WorkerTask implements Runnable { if (t instanceof Error) throw (Error) t; } finally { - try { - Thread.currentThread().setName(savedName); - Plugins.compareAndSwapLoaders(savedLoader); - shutdownLatch.countDown(); - } finally { - try { - releaseResources(); - } finally { - taskMetricsGroup.close(); - } - } + Thread.currentThread().setName(savedName); + Plugins.compareAndSwapLoaders(savedLoader); + shutdownLatch.countDown(); } } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java index 44c45d5..c26a88c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java @@ -109,9 +109,6 @@ public class WorkerTaskTest { workerTask.close(); expectLastCall(); - workerTask.releaseResources(); - EasyMock.expectLastCall(); - statusListener.onShutdown(taskId); expectLastCall(); @@ -153,9 +150,6 @@ public class WorkerTaskTest { workerTask.close(); EasyMock.expectLastCall(); - workerTask.releaseResources(); - EasyMock.expectLastCall(); - replay(workerTask); workerTask.initialize(TASK_CONFIG); @@ -219,9 +213,6 @@ public class WorkerTaskTest { workerTask.close(); expectLastCall(); - workerTask.releaseResources(); - EasyMock.expectLastCall(); - // there should be no call to onShutdown() replay(workerTask); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 48ab58f..5177acf 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -602,6 +602,9 @@ public class WorkerTest extends ThreadedTest { EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); EasyMock.expectLastCall(); + workerTask.removeMetrics(); + EasyMock.expectLastCall(); + expectStopStorage(); expectClusterId(); @@ -677,6 +680,9 @@ public class WorkerTest extends ThreadedTest { EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); EasyMock.expectLastCall(); + workerTask.removeMetrics(); + EasyMock.expectLastCall(); + // Each time we check the task metrics, the worker will call the herder herder.taskStatus(TASK_ID); EasyMock.expectLastCall() @@ -890,6 +896,9 @@ public class WorkerTest extends ThreadedTest { // Note that in this case we *do not* commit offsets since it's an unclean shutdown EasyMock.expectLastCall(); + workerTask.removeMetrics(); + EasyMock.expectLastCall(); + expectStopStorage(); expectClusterId(); @@ -964,6 +973,9 @@ public class WorkerTest extends ThreadedTest { EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); EasyMock.expectLastCall(); + workerTask.removeMetrics(); + EasyMock.expectLastCall(); + expectStopStorage(); expectClusterId(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java index 8b54033..f698a30 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java @@ -594,6 +594,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest { EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); EasyMock.expectLastCall(); + workerTask.removeMetrics(); + EasyMock.expectLastCall(); + expectStopStorage(); expectClusterId(); @@ -669,6 +672,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest { EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); EasyMock.expectLastCall(); + workerTask.removeMetrics(); + EasyMock.expectLastCall(); + // Each time we check the task metrics, the worker will call the herder herder.taskStatus(TASK_ID); EasyMock.expectLastCall() @@ -879,6 +885,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest { // Note that in this case we *do not* commit offsets since it's an unclean shutdown EasyMock.expectLastCall(); + workerTask.removeMetrics(); + EasyMock.expectLastCall(); + expectStopStorage(); expectClusterId(); @@ -953,6 +962,9 @@ public class WorkerWithTopicCreationTest extends ThreadedTest { EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true); EasyMock.expectLastCall(); + workerTask.removeMetrics(); + EasyMock.expectLastCall(); + expectStopStorage(); expectClusterId();