This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 8e00835  KAFKA-9887 fix failed task or connector count on startup 
failure (#8844)
8e00835 is described below

commit 8e00835b108b3779fbd927a41f3c3c3a0606c4d7
Author: Michael Carter 
<53591940+michael-carter-instaclu...@users.noreply.github.com>
AuthorDate: Wed Jul 21 08:39:26 2021 +1000

    KAFKA-9887 fix failed task or connector count on startup failure (#8844)
    
    Moved the responsibility for recording task and connector startup and 
failure metrics from the invocation code
    into the status listener. The reason behind this is that the WorkerTasks 
(and subclasses) were either not propagating exceptions upwards, or were unable 
to do so easily because they were running on completely different threads.
    
    Also split out WorkerMetricsGroup from being an inner class into being a 
standard class. This was to make sure
    the Data Abstraction Count checkStyle rule was not violated.
    
    Author: Michael Carter <michael.car...@instaclustr.com>
    Reviewers: Chris Egerton <chr...@confluent.io>, Randall Hauch 
<rha...@gmail.com>
---
 .../org/apache/kafka/connect/runtime/Worker.java   | 105 +--------
 .../kafka/connect/runtime/WorkerMetricsGroup.java  | 204 +++++++++++++++++
 .../kafka/connect/runtime/WorkerSinkTask.java      |   3 +-
 .../kafka/connect/runtime/WorkerSourceTask.java    |  25 ++-
 .../apache/kafka/connect/runtime/WorkerTask.java   |   6 +-
 .../connect/runtime/ErrorHandlingTaskTest.java     |   2 +
 .../ErrorHandlingTaskWithTopicCreationTest.java    |   2 +
 .../connect/runtime/WorkerMetricsGroupTest.java    | 249 +++++++++++++++++++++
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |   3 +
 .../kafka/connect/runtime/WorkerTaskTest.java      |   8 +
 .../apache/kafka/connect/runtime/WorkerTest.java   |   6 +-
 .../runtime/WorkerWithTopicCreationTest.java       |   7 +-
 12 files changed, 503 insertions(+), 117 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index fffe71a..27f96de 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -21,13 +21,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.config.provider.ConfigProvider;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.CumulativeSum;
-import org.apache.kafka.common.metrics.stats.Frequencies;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
@@ -57,8 +53,8 @@ import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
-import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.LoggingContext;
 import org.apache.kafka.connect.util.SinkUtils;
@@ -145,7 +141,7 @@ public class Worker {
         this.plugins = plugins;
         this.config = config;
         this.connectorClientConfigOverridePolicy = 
connectorClientConfigOverridePolicy;
-        this.workerMetricsGroup = new WorkerMetricsGroup(metrics);
+        this.workerMetricsGroup = new WorkerMetricsGroup(this.connectors, 
this.tasks, metrics);
 
         // Internal converters are required properties, thus getClass won't 
return null.
         this.internalKeyConverter = plugins.newConverter(
@@ -253,6 +249,7 @@ public class Worker {
             TargetState initialState,
             Callback<TargetState> onConnectorStateChange
     ) {
+        final ConnectorStatus.Listener connectorStatusListener = 
workerMetricsGroup.wrapStatusListener(statusListener);
         try (LoggingContext loggingContext = 
LoggingContext.forConnector(connName)) {
             if (connectors.containsKey(connName)) {
                 onConnectorStateChange.onCompletion(
@@ -280,7 +277,7 @@ public class Worker {
                 final OffsetStorageReader offsetReader = new 
OffsetStorageReaderImpl(
                         offsetBackingStore, connName, internalKeyConverter, 
internalValueConverter);
                 workerConnector = new WorkerConnector(
-                        connName, connector, connConfig, ctx, metrics, 
statusListener, offsetReader, connectorLoader);
+                        connName, connector, connConfig, ctx, metrics, 
connectorStatusListener, offsetReader, connectorLoader);
                 log.info("Instantiated connector {} with version {} of type 
{}", connName, connector.version(), connector.getClass());
                 workerConnector.transitionTo(initialState, 
onConnectorStateChange);
                 Plugins.compareAndSwapLoaders(savedLoader);
@@ -289,8 +286,7 @@ public class Worker {
                 // Can't be put in a finally block because it needs to be 
swapped before the call on
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
-                workerMetricsGroup.recordConnectorStartupFailure();
-                statusListener.onFailure(connName, t);
+                connectorStatusListener.onFailure(connName, t);
                 onConnectorStateChange.onCompletion(t, null);
                 return;
             }
@@ -308,7 +304,6 @@ public class Worker {
             executor.submit(workerConnector);
 
             log.info("Finished creating connector {}", connName);
-            workerMetricsGroup.recordConnectorStartupSuccess();
         }
     }
 
@@ -505,6 +500,7 @@ public class Worker {
             TargetState initialState
     ) {
         final WorkerTask workerTask;
+        final TaskStatus.Listener taskStatusListener = 
workerMetricsGroup.wrapStatusListener(statusListener);
         try (LoggingContext loggingContext = LoggingContext.forTask(id)) {
             log.info("Creating task {}", id);
 
@@ -552,8 +548,8 @@ public class Worker {
                     log.info("Set up the header converter {} for task {} using 
the connector config", headerConverter.getClass(), id);
                 }
 
-                workerTask = buildWorkerTask(configState, connConfig, id, 
task, statusListener, initialState, keyConverter, valueConverter,
-                                             headerConverter, connectorLoader);
+                workerTask = buildWorkerTask(configState, connConfig, id, 
task, taskStatusListener,
+                        initialState, keyConverter, valueConverter, 
headerConverter, connectorLoader);
                 workerTask.initialize(taskConfig);
                 Plugins.compareAndSwapLoaders(savedLoader);
             } catch (Throwable t) {
@@ -562,8 +558,7 @@ public class Worker {
                 // statusListener
                 Plugins.compareAndSwapLoaders(savedLoader);
                 connectorStatusMetricsGroup.recordTaskRemoved(id);
-                workerMetricsGroup.recordTaskFailure();
-                statusListener.onFailure(id, t);
+                taskStatusListener.onFailure(id, t);
                 return false;
             }
 
@@ -575,7 +570,6 @@ public class Worker {
             if (workerTask instanceof WorkerSourceTask) {
                 sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) 
workerTask);
             }
-            workerMetricsGroup.recordTaskSuccess();
             return true;
         }
     }
@@ -1063,85 +1057,4 @@ public class Worker {
         }
     }
 
-    class WorkerMetricsGroup {
-        private final MetricGroup metricGroup;
-        private final Sensor connectorStartupAttempts;
-        private final Sensor connectorStartupSuccesses;
-        private final Sensor connectorStartupFailures;
-        private final Sensor connectorStartupResults;
-        private final Sensor taskStartupAttempts;
-        private final Sensor taskStartupSuccesses;
-        private final Sensor taskStartupFailures;
-        private final Sensor taskStartupResults;
-
-        public WorkerMetricsGroup(ConnectMetrics connectMetrics) {
-            ConnectMetricsRegistry registry = connectMetrics.registry();
-            metricGroup = connectMetrics.group(registry.workerGroupName());
-
-            metricGroup.addValueMetric(registry.connectorCount, now -> 
(double) connectors.size());
-            metricGroup.addValueMetric(registry.taskCount, now -> (double) 
tasks.size());
-
-            MetricName connectorFailurePct = 
metricGroup.metricName(registry.connectorStartupFailurePercentage);
-            MetricName connectorSuccessPct = 
metricGroup.metricName(registry.connectorStartupSuccessPercentage);
-            Frequencies connectorStartupResultFrequencies = 
Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
-            connectorStartupResults = 
metricGroup.sensor("connector-startup-results");
-            connectorStartupResults.add(connectorStartupResultFrequencies);
-
-            connectorStartupAttempts = 
metricGroup.sensor("connector-startup-attempts");
-            
connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal),
 new CumulativeSum());
-
-            connectorStartupSuccesses = 
metricGroup.sensor("connector-startup-successes");
-            
connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal),
 new CumulativeSum());
-
-            connectorStartupFailures = 
metricGroup.sensor("connector-startup-failures");
-            
connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal),
 new CumulativeSum());
-
-            MetricName taskFailurePct = 
metricGroup.metricName(registry.taskStartupFailurePercentage);
-            MetricName taskSuccessPct = 
metricGroup.metricName(registry.taskStartupSuccessPercentage);
-            Frequencies taskStartupResultFrequencies = 
Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
-            taskStartupResults = metricGroup.sensor("task-startup-results");
-            taskStartupResults.add(taskStartupResultFrequencies);
-
-            taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
-            
taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal),
 new CumulativeSum());
-
-            taskStartupSuccesses = 
metricGroup.sensor("task-startup-successes");
-            
taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal),
 new CumulativeSum());
-
-            taskStartupFailures = metricGroup.sensor("task-startup-failures");
-            
taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal),
 new CumulativeSum());
-        }
-
-        void close() {
-            metricGroup.close();
-        }
-
-        void recordConnectorStartupFailure() {
-            connectorStartupAttempts.record(1.0);
-            connectorStartupFailures.record(1.0);
-            connectorStartupResults.record(0.0);
-        }
-
-        void recordConnectorStartupSuccess() {
-            connectorStartupAttempts.record(1.0);
-            connectorStartupSuccesses.record(1.0);
-            connectorStartupResults.record(1.0);
-        }
-
-        void recordTaskFailure() {
-            taskStartupAttempts.record(1.0);
-            taskStartupFailures.record(1.0);
-            taskStartupResults.record(0.0);
-        }
-
-        void recordTaskSuccess() {
-            taskStartupAttempts.record(1.0);
-            taskStartupSuccesses.record(1.0);
-            taskStartupResults.record(1.0);
-        }
-
-        protected MetricGroup metricGroup() {
-            return metricGroup;
-        }
-    }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
new file mode 100644
index 0000000..2ad407d
--- /dev/null
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerMetricsGroup.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.common.metrics.stats.Frequencies;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+
+class WorkerMetricsGroup {
+    private final ConnectMetrics.MetricGroup metricGroup;
+    private final Sensor connectorStartupAttempts;
+    private final Sensor connectorStartupSuccesses;
+    private final Sensor connectorStartupFailures;
+    private final Sensor connectorStartupResults;
+    private final Sensor taskStartupAttempts;
+    private final Sensor taskStartupSuccesses;
+    private final Sensor taskStartupFailures;
+    private final Sensor taskStartupResults;
+
+    public WorkerMetricsGroup(final Map<String, WorkerConnector> connectors, 
Map<ConnectorTaskId, WorkerTask> tasks, ConnectMetrics connectMetrics) {
+        ConnectMetricsRegistry registry = connectMetrics.registry();
+        metricGroup = connectMetrics.group(registry.workerGroupName());
+
+        metricGroup.addValueMetric(registry.connectorCount, now -> (double) 
connectors.size());
+        metricGroup.addValueMetric(registry.taskCount, now -> (double) 
tasks.size());
+
+        MetricName connectorFailurePct = 
metricGroup.metricName(registry.connectorStartupFailurePercentage);
+        MetricName connectorSuccessPct = 
metricGroup.metricName(registry.connectorStartupSuccessPercentage);
+        Frequencies connectorStartupResultFrequencies = 
Frequencies.forBooleanValues(connectorFailurePct, connectorSuccessPct);
+        connectorStartupResults = 
metricGroup.sensor("connector-startup-results");
+        connectorStartupResults.add(connectorStartupResultFrequencies);
+
+        connectorStartupAttempts = 
metricGroup.sensor("connector-startup-attempts");
+        
connectorStartupAttempts.add(metricGroup.metricName(registry.connectorStartupAttemptsTotal),
 new CumulativeSum());
+
+        connectorStartupSuccesses = 
metricGroup.sensor("connector-startup-successes");
+        
connectorStartupSuccesses.add(metricGroup.metricName(registry.connectorStartupSuccessTotal),
 new CumulativeSum());
+
+        connectorStartupFailures = 
metricGroup.sensor("connector-startup-failures");
+        
connectorStartupFailures.add(metricGroup.metricName(registry.connectorStartupFailureTotal),
 new CumulativeSum());
+
+        MetricName taskFailurePct = 
metricGroup.metricName(registry.taskStartupFailurePercentage);
+        MetricName taskSuccessPct = 
metricGroup.metricName(registry.taskStartupSuccessPercentage);
+        Frequencies taskStartupResultFrequencies = 
Frequencies.forBooleanValues(taskFailurePct, taskSuccessPct);
+        taskStartupResults = metricGroup.sensor("task-startup-results");
+        taskStartupResults.add(taskStartupResultFrequencies);
+
+        taskStartupAttempts = metricGroup.sensor("task-startup-attempts");
+        
taskStartupAttempts.add(metricGroup.metricName(registry.taskStartupAttemptsTotal),
 new CumulativeSum());
+
+        taskStartupSuccesses = metricGroup.sensor("task-startup-successes");
+        
taskStartupSuccesses.add(metricGroup.metricName(registry.taskStartupSuccessTotal),
 new CumulativeSum());
+
+        taskStartupFailures = metricGroup.sensor("task-startup-failures");
+        
taskStartupFailures.add(metricGroup.metricName(registry.taskStartupFailureTotal),
 new CumulativeSum());
+    }
+
+    void close() {
+        metricGroup.close();
+    }
+
+    void recordConnectorStartupFailure() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupFailures.record(1.0);
+        connectorStartupResults.record(0.0);
+    }
+
+    void recordConnectorStartupSuccess() {
+        connectorStartupAttempts.record(1.0);
+        connectorStartupSuccesses.record(1.0);
+        connectorStartupResults.record(1.0);
+    }
+
+    void recordTaskFailure() {
+        taskStartupAttempts.record(1.0);
+        taskStartupFailures.record(1.0);
+        taskStartupResults.record(0.0);
+    }
+
+    void recordTaskSuccess() {
+        taskStartupAttempts.record(1.0);
+        taskStartupSuccesses.record(1.0);
+        taskStartupResults.record(1.0);
+    }
+
+    protected ConnectMetrics.MetricGroup metricGroup() {
+        return metricGroup;
+    }
+
+    ConnectorStatus.Listener wrapStatusListener(ConnectorStatus.Listener 
delegateListener) {
+        return new ConnectorStatusListener(delegateListener);
+    }
+
+    TaskStatus.Listener wrapStatusListener(TaskStatus.Listener 
delegateListener) {
+        return new TaskStatusListener(delegateListener);
+    }
+
+    class ConnectorStatusListener implements ConnectorStatus.Listener {
+        private final ConnectorStatus.Listener delegateListener;
+        private volatile boolean startupSucceeded = false;
+
+        ConnectorStatusListener(ConnectorStatus.Listener delegateListener) {
+            this.delegateListener = delegateListener;
+        }
+
+        @Override
+        public void onStartup(final String connector) {
+            startupSucceeded = true;
+            recordConnectorStartupSuccess();
+            delegateListener.onStartup(connector);
+        }
+
+        @Override
+        public void onPause(final String connector) {
+            delegateListener.onPause(connector);
+        }
+
+        @Override
+        public void onResume(final String connector) {
+            delegateListener.onResume(connector);
+        }
+
+        @Override
+        public void onFailure(final String connector, final Throwable cause) {
+            if (!startupSucceeded) {
+                recordConnectorStartupFailure();
+            }
+            delegateListener.onFailure(connector, cause);
+        }
+
+        @Override
+        public void onShutdown(final String connector) {
+            delegateListener.onShutdown(connector);
+        }
+
+        @Override
+        public void onDeletion(final String connector) {
+            delegateListener.onDeletion(connector);
+        }
+    }
+
+    class TaskStatusListener implements TaskStatus.Listener {
+        private final TaskStatus.Listener delegatedListener;
+        private volatile boolean startupSucceeded = false;
+
+        TaskStatusListener(TaskStatus.Listener delegatedListener) {
+            this.delegatedListener = delegatedListener;
+        }
+
+        @Override
+        public void onStartup(final ConnectorTaskId id) {
+            recordTaskSuccess();
+            startupSucceeded = true;
+            delegatedListener.onStartup(id);
+        }
+
+        @Override
+        public void onPause(final ConnectorTaskId id) {
+            delegatedListener.onPause(id);
+        }
+
+        @Override
+        public void onResume(final ConnectorTaskId id) {
+            delegatedListener.onResume(id);
+        }
+
+        @Override
+        public void onFailure(final ConnectorTaskId id, final Throwable cause) 
{
+            if (!startupSucceeded) {
+                recordTaskFailure();
+            }
+            delegatedListener.onFailure(id, cause);
+        }
+
+        @Override
+        public void onShutdown(final ConnectorTaskId id) {
+            delegatedListener.onShutdown(id);
+        }
+
+        @Override
+        public void onDeletion(final ConnectorTaskId id) {
+            delegatedListener.onDeletion(id);
+        }
+    }
+
+}
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 92b2cd9..2e17184 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -193,7 +193,7 @@ class WorkerSinkTask extends WorkerTask {
 
     @Override
     public void execute() {
-        initializeAndStart();
+        log.info("{} Executing sink task", this);
         // Make sure any uncommitted data has been committed and the task has
         // a chance to clean up its state
         try (UncheckedCloseable suppressible = this::closePartitions) {
@@ -290,6 +290,7 @@ class WorkerSinkTask extends WorkerTask {
     /**
      * Initializes and starts the SinkTask.
      */
+    @Override
     protected void initializeAndStart() {
         SinkConnectorConfig.validate(taskConfig);
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 6300b11..3de8a9d 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -219,18 +219,23 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     @Override
+    protected void initializeAndStart() {
+        // If we try to start the task at all by invoking initialize, then 
count this as
+        // "started" and expect a subsequent call to the task's stop() method
+        // to properly clean up any resources allocated by its initialize() or
+        // start() methods. If the task throws an exception during stop(),
+        // the worst thing that happens is another exception gets logged for 
an already-
+        // failed task
+        started = true;
+        task.initialize(new WorkerSourceTaskContext(offsetReader, this, 
configState));
+        task.start(taskConfig);
+        log.info("{} Source task finished initialization and start", this);
+    }
+
+    @Override
     public void execute() {
         try {
-            // If we try to start the task at all by invoking initialize, then 
count this as
-            // "started" and expect a subsequent call to the task's stop() 
method
-            // to properly clean up any resources allocated by its 
initialize() or 
-            // start() methods. If the task throws an exception during stop(),
-            // the worst thing that happens is another exception gets logged 
for an already-
-            // failed task
-            started = true;
-            task.initialize(new WorkerSourceTaskContext(offsetReader, this, 
configState));
-            task.start(taskConfig);
-            log.info("{} Source task finished initialization and start", this);
+            log.info("{} Executing source task", this);
             while (!isStopping()) {
                 if (shouldPause()) {
                     onPause();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index deacb62..8e49733 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -151,6 +151,8 @@ abstract class WorkerTask implements Runnable {
         taskMetricsGroup.close();
     }
 
+    protected abstract void initializeAndStart();
+
     protected abstract void execute();
 
     protected abstract void close();
@@ -182,10 +184,10 @@ abstract class WorkerTask implements Runnable {
                     onPause();
                     if (!awaitUnpause()) return;
                 }
-
-                statusListener.onStartup(id);
             }
 
+            initializeAndStart();
+            statusListener.onStartup(id);
             execute();
         } catch (Throwable t) {
             log.error("{} Task threw an uncaught and unrecoverable exception. 
Task is being killed and will not recover until manually restarted", this, t);
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index daa9edd..9d6ca32 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -371,6 +371,7 @@ public class ErrorHandlingTaskTest {
         PowerMock.replayAll();
 
         workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
 
         // two records were consumed from Kafka
@@ -436,6 +437,7 @@ public class ErrorHandlingTaskTest {
         PowerMock.replayAll();
 
         workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
 
         // two records were consumed from Kafka
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
index 5cdfab9..7f39666 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskWithTopicCreationTest.java
@@ -386,6 +386,7 @@ public class ErrorHandlingTaskWithTopicCreationTest {
         PowerMock.replayAll();
 
         workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
 
         // two records were consumed from Kafka
@@ -451,6 +452,7 @@ public class ErrorHandlingTaskWithTopicCreationTest {
         PowerMock.replayAll();
 
         workerSourceTask.initialize(TASK_CONFIG);
+        workerSourceTask.initializeAndStart();
         workerSourceTask.execute();
 
         // two records were consumed from Kafka
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
new file mode 100644
index 0000000..2eb20ce
--- /dev/null
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerMetricsGroupTest.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.MetricNameTemplate;
+import org.apache.kafka.common.metrics.CompoundStat;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.CumulativeSum;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.HashMap;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.anyString;
+import static org.easymock.EasyMock.eq;
+import static org.powermock.api.easymock.PowerMock.expectLastCall;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({Sensor.class, MetricName.class})
+public class WorkerMetricsGroupTest {
+    private final String connector = "org.FakeConnector";
+    private final ConnectorTaskId task = new ConnectorTaskId(connector, 0);
+    private final RuntimeException exception = new RuntimeException();
+
+    private ConnectMetrics connectMetrics;
+    
+    private Sensor connectorStartupResults;
+    private Sensor connectorStartupAttempts;
+    private Sensor connectorStartupSuccesses;
+    private Sensor connectorStartupFailures;
+
+    private Sensor taskStartupResults;
+    private Sensor taskStartupAttempts;
+    private Sensor taskStartupSuccesses;
+    private Sensor taskStartupFailures;
+
+    private ConnectorStatus.Listener delegateConnectorListener;
+    private TaskStatus.Listener delegateTaskListener;
+
+    @Before
+    public void setup() {
+        connectMetrics = PowerMock.createMock(ConnectMetrics.class);
+        ConnectMetricsRegistry connectMetricsRegistry = 
PowerMock.createNiceMock(ConnectMetricsRegistry.class);
+        ConnectMetrics.MetricGroup metricGroup = 
PowerMock.createNiceMock(ConnectMetrics.MetricGroup.class);
+
+        connectMetrics.registry();
+        expectLastCall().andReturn(connectMetricsRegistry);
+
+        connectMetrics.group(anyString());
+        expectLastCall().andReturn(metricGroup);
+
+        MetricName metricName = PowerMock.createMock(MetricName.class);
+        metricGroup.metricName(anyObject(MetricNameTemplate.class));
+        expectLastCall().andStubReturn(metricName);
+
+        connectorStartupResults = mockSensor(metricGroup, 
"connector-startup-results");
+        connectorStartupAttempts = mockSensor(metricGroup, 
"connector-startup-attempts");
+        connectorStartupSuccesses = mockSensor(metricGroup, 
"connector-startup-successes");
+        connectorStartupFailures = mockSensor(metricGroup, 
"connector-startup-failures");
+
+        taskStartupResults = mockSensor(metricGroup, "task-startup-results");
+        taskStartupAttempts = mockSensor(metricGroup, "task-startup-attempts");
+        taskStartupSuccesses = mockSensor(metricGroup, 
"task-startup-successes");
+        taskStartupFailures = mockSensor(metricGroup, "task-startup-failures");
+
+        delegateConnectorListener = 
PowerMock.createStrictMock(ConnectorStatus.Listener.class);
+        delegateTaskListener = 
PowerMock.createStrictMock(TaskStatus.Listener.class);
+    }
+
+    private Sensor mockSensor(ConnectMetrics.MetricGroup metricGroup, String 
name) {
+        Sensor sensor = PowerMock.createMock(Sensor.class);
+        metricGroup.sensor(eq(name));
+        expectLastCall().andReturn(sensor);
+
+        sensor.add(anyObject(CompoundStat.class));
+        expectLastCall().andStubReturn(true);
+
+        sensor.add(anyObject(MetricName.class), 
anyObject(CumulativeSum.class));
+        expectLastCall().andStubReturn(true);
+
+        return sensor;
+    }
+    
+    @Test
+    public void testConnectorStartupRecordedMetrics() {
+        delegateConnectorListener.onStartup(eq(connector));
+        expectLastCall();
+
+        connectorStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        connectorStartupSuccesses.record(eq(1.0));
+        expectLastCall();
+        connectorStartupResults.record(eq(1.0));
+        expectLastCall();
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
+        final ConnectorStatus.Listener connectorListener = 
workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
+
+        connectorListener.onStartup(connector);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorFailureAfterStartupRecordedMetrics() {
+        delegateConnectorListener.onStartup(eq(connector));
+        expectLastCall();
+
+        connectorStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        connectorStartupSuccesses.record(eq(1.0));
+        expectLastCall();
+        connectorStartupResults.record(eq(1.0));
+        expectLastCall();
+        
+        delegateConnectorListener.onFailure(eq(connector), eq(exception));
+        expectLastCall();
+
+        // recordConnectorStartupFailure() should not be called if failure 
happens after a successful startup
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
+        final ConnectorStatus.Listener connectorListener = 
workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
+
+        connectorListener.onStartup(connector);
+        connectorListener.onFailure(connector, exception);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConnectorFailureBeforeStartupRecordedMetrics() {
+        delegateConnectorListener.onFailure(eq(connector), eq(exception));
+        expectLastCall();
+
+        connectorStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        connectorStartupFailures.record(eq(1.0));
+        expectLastCall();
+        connectorStartupResults.record(eq(0.0));
+        expectLastCall();
+        
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
+        final ConnectorStatus.Listener connectorListener = 
workerMetricsGroup.wrapStatusListener(delegateConnectorListener);
+        
+        connectorListener.onFailure(connector, exception);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTaskStartupRecordedMetrics() {
+        delegateTaskListener.onStartup(eq(task));
+        expectLastCall();
+
+        taskStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        taskStartupSuccesses.record(eq(1.0));
+        expectLastCall();
+        taskStartupResults.record(eq(1.0));
+        expectLastCall();
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
+        final TaskStatus.Listener taskListener = 
workerMetricsGroup.wrapStatusListener(delegateTaskListener);
+
+        taskListener.onStartup(task);
+
+        PowerMock.verifyAll();
+    }
+    
+    @Test
+    public void testTaskFailureAfterStartupRecordedMetrics() {
+        delegateTaskListener.onStartup(eq(task));
+        expectLastCall();
+
+        taskStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        taskStartupSuccesses.record(eq(1.0));
+        expectLastCall();
+        taskStartupResults.record(eq(1.0));
+        expectLastCall();
+
+        delegateTaskListener.onFailure(eq(task), eq(exception));
+        expectLastCall();
+
+        // recordTaskFailure() should not be called if failure happens after a 
successful startup
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
+        final TaskStatus.Listener taskListener = 
workerMetricsGroup.wrapStatusListener(delegateTaskListener);
+
+        taskListener.onStartup(task);
+        taskListener.onFailure(task, exception);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testTaskFailureBeforeStartupRecordedMetrics() {
+        delegateTaskListener.onFailure(eq(task), eq(exception));
+        expectLastCall();
+
+        taskStartupAttempts.record(eq(1.0));
+        expectLastCall();
+        taskStartupFailures.record(eq(1.0));
+        expectLastCall();
+        taskStartupResults.record(eq(0.0));
+        expectLastCall();
+
+        PowerMock.replayAll();
+
+        WorkerMetricsGroup workerMetricsGroup = new WorkerMetricsGroup(new 
HashMap<>(), new HashMap<>(), connectMetrics);
+        final TaskStatus.Listener taskListener = 
workerMetricsGroup.wrapStatusListener(delegateTaskListener);
+
+        taskListener.onFailure(task, exception);
+
+        PowerMock.verifyAll();
+    }
+
+}
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index c2beb66..43a1099 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -633,6 +633,7 @@ public class WorkerSinkTaskTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
         workerTask.execute();
 
         assertEquals(0, workerTask.commitFailures());
@@ -996,6 +997,7 @@ public class WorkerSinkTaskTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
         try {
             workerTask.execute();
             fail("workerTask.execute should have thrown an exception");
@@ -1035,6 +1037,7 @@ public class WorkerSinkTaskTest {
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
         try {
             workerTask.execute();
             fail("workerTask.execute should have thrown an exception");
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index c26a88c..807bf84 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -93,6 +93,7 @@ public class WorkerTaskTest {
                 .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
                         retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
                 .addMockedMethod("initialize")
+                .addMockedMethod("initializeAndStart")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
                 .createStrictMock();
@@ -100,6 +101,9 @@ public class WorkerTaskTest {
         workerTask.initialize(TASK_CONFIG);
         expectLastCall();
 
+        workerTask.initializeAndStart();
+        expectLastCall();
+
         workerTask.execute();
         expectLastCall();
 
@@ -180,6 +184,7 @@ public class WorkerTaskTest {
                 .withArgs(taskId, statusListener, TargetState.STARTED, loader, 
metrics,
                         retryWithToleranceOperator, Time.SYSTEM, 
statusBackingStore)
                 .addMockedMethod("initialize")
+                .addMockedMethod("initializeAndStart")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
                 .createStrictMock();
@@ -198,6 +203,9 @@ public class WorkerTaskTest {
         workerTask.initialize(TASK_CONFIG);
         EasyMock.expectLastCall();
 
+        workerTask.initializeAndStart();
+        EasyMock.expectLastCall();
+
         workerTask.execute();
         expectLastCall().andAnswer(new IAnswer<Void>() {
             @Override
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index cad0cf4..45f1cff 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -25,9 +25,9 @@ import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.config.provider.MockFileConfigProvider;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.ConnectorContext;
@@ -677,20 +677,16 @@ public class WorkerTest extends ThreadedTest {
         worker.herder = herder;
         worker.start();
         assertStatistics(worker, 0, 0);
-        assertStartupStatistics(worker, 0, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
         worker.startTask(TASK_ID, ClusterConfigState.EMPTY, 
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
-        assertStartupStatistics(worker, 0, 0, 1, 0);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
         assertStatistics(worker, 0, 0);
-        assertStartupStatistics(worker, 0, 0, 1, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
         assertStatistics(worker, 0, 0);
-        assertStartupStatistics(worker, 0, 0, 1, 0);
 
         PowerMock.verifyAll();
     }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
index fe0005b..7e0916d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerWithTopicCreationTest.java
@@ -672,16 +672,17 @@ public class WorkerWithTopicCreationTest extends 
ThreadedTest {
         assertEquals(Collections.emptySet(), worker.taskIds());
         worker.startTask(TASK_ID, ClusterConfigState.EMPTY, 
anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         assertStatistics(worker, 0, 1);
-        assertStartupStatistics(worker, 0, 0, 1, 0);
+        // no task starts because we've mocked out the executor that starts 
the task and updates the stats
+        assertStartupStatistics(worker, 0, 0, 0, 0);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
         assertStatistics(worker, 0, 0);
-        assertStartupStatistics(worker, 0, 0, 1, 0);
+        assertStartupStatistics(worker, 0, 0, 0, 0);
         assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
         assertStatistics(worker, 0, 0);
-        assertStartupStatistics(worker, 0, 0, 1, 0);
+        assertStartupStatistics(worker, 0, 0, 0, 0);
 
         PowerMock.verifyAll();
     }

Reply via email to