[ 
https://issues.apache.org/jira/browse/KAFKA-6578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16373981#comment-16373981
 ] 

ASF GitHub Bot commented on KAFKA-6578:
---------------------------------------

hachikuji closed pull request #4609: KAFKA-6578: Changed the Connect 
distributed and standalone main method to log all exceptions
URL: https://github.com/apache/kafka/pull/4609
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 3b7ec87f644..54854fe4b80 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -58,52 +58,59 @@ public static void main(String[] args) throws Exception {
             Exit.exit(1);
         }
 
-        Time time = Time.SYSTEM;
-        log.info("Kafka Connect distributed worker initializing ...");
-        long initStart = time.hiResClockMs();
-        WorkerInfo initInfo = new WorkerInfo();
-        initInfo.logAll();
+        try {
+            Time time = Time.SYSTEM;
+            log.info("Kafka Connect distributed worker initializing ...");
+            long initStart = time.hiResClockMs();
+            WorkerInfo initInfo = new WorkerInfo();
+            initInfo.logAll();
 
-        String workerPropsFile = args[0];
-        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.<String, String>emptyMap();
+            String workerPropsFile = args[0];
+            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.<String, String>emptyMap();
 
-        log.info("Scanning for plugin classes. This might take a moment ...");
-        Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        DistributedConfig config = new DistributedConfig(workerProps);
+            log.info("Scanning for plugin classes. This might take a moment 
...");
+            Plugins plugins = new Plugins(workerProps);
+            plugins.compareAndSwapWithDelegatingLoader();
+            DistributedConfig config = new DistributedConfig(workerProps);
 
-        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
-        log.debug("Kafka cluster ID: {}", kafkaClusterId);
+            String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+            log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
-        RestServer rest = new RestServer(config);
-        URI advertisedUrl = rest.advertisedUrl();
-        String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
+            RestServer rest = new RestServer(config);
+            URI advertisedUrl = rest.advertisedUrl();
+            String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-        KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
-        offsetBackingStore.configure(config);
+            KafkaOffsetBackingStore offsetBackingStore = new 
KafkaOffsetBackingStore();
+            offsetBackingStore.configure(config);
 
-        Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore);
+            Worker worker = new Worker(workerId, time, plugins, config, 
offsetBackingStore);
 
-        Converter internalValueConverter = worker.getInternalValueConverter();
-        StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
-        statusBackingStore.configure(config);
+            Converter internalValueConverter = 
worker.getInternalValueConverter();
+            StatusBackingStore statusBackingStore = new 
KafkaStatusBackingStore(time, internalValueConverter);
+            statusBackingStore.configure(config);
 
-        ConfigBackingStore configBackingStore = new 
KafkaConfigBackingStore(internalValueConverter, config);
+            ConfigBackingStore configBackingStore = new 
KafkaConfigBackingStore(internalValueConverter, config);
 
-        DistributedHerder herder = new DistributedHerder(config, time, worker,
-                kafkaClusterId, statusBackingStore, configBackingStore,
-                advertisedUrl.toString());
-        final Connect connect = new Connect(herder, rest);
-        log.info("Kafka Connect distributed worker initialization took {}ms", 
time.hiResClockMs() - initStart);
-        try {
-            connect.start();
-        } catch (Exception e) {
-            log.error("Failed to start Connect", e);
-            connect.stop();
-        }
+            DistributedHerder herder = new DistributedHerder(config, time, 
worker,
+                    kafkaClusterId, statusBackingStore, configBackingStore,
+                    advertisedUrl.toString());
+            final Connect connect = new Connect(herder, rest);
+            log.info("Kafka Connect distributed worker initialization took 
{}ms", time.hiResClockMs() - initStart);
+            try {
+                connect.start();
+            } catch (Exception e) {
+                log.error("Failed to start Connect", e);
+                connect.stop();
+                Exit.exit(3);
+            }
 
-        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
-        connect.awaitStop();
+            // Shutdown will be triggered by Ctrl-C or via HTTP shutdown 
request
+            connect.awaitStop();
+
+        } catch (Throwable t) {
+            log.error("Stopping due to error", t);
+            Exit.exit(2);
+        }
     }
 }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 413cb46cf28..aba9d9c32aa 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -62,58 +62,65 @@ public static void main(String[] args) throws Exception {
             Exit.exit(1);
         }
 
-        Time time = Time.SYSTEM;
-        log.info("Kafka Connect standalone worker initializing ...");
-        long initStart = time.hiResClockMs();
-        WorkerInfo initInfo = new WorkerInfo();
-        initInfo.logAll();
+        try {
+            Time time = Time.SYSTEM;
+            log.info("Kafka Connect standalone worker initializing ...");
+            long initStart = time.hiResClockMs();
+            WorkerInfo initInfo = new WorkerInfo();
+            initInfo.logAll();
 
-        String workerPropsFile = args[0];
-        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.<String, String>emptyMap();
+            String workerPropsFile = args[0];
+            Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
+                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.<String, String>emptyMap();
 
-        log.info("Scanning for plugin classes. This might take a moment ...");
-        Plugins plugins = new Plugins(workerProps);
-        plugins.compareAndSwapWithDelegatingLoader();
-        StandaloneConfig config = new StandaloneConfig(workerProps);
+            log.info("Scanning for plugin classes. This might take a moment 
...");
+            Plugins plugins = new Plugins(workerProps);
+            plugins.compareAndSwapWithDelegatingLoader();
+            StandaloneConfig config = new StandaloneConfig(workerProps);
 
-        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
-        log.debug("Kafka cluster ID: {}", kafkaClusterId);
+            String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+            log.debug("Kafka cluster ID: {}", kafkaClusterId);
 
-        RestServer rest = new RestServer(config);
-        URI advertisedUrl = rest.advertisedUrl();
-        String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
+            RestServer rest = new RestServer(config);
+            URI advertisedUrl = rest.advertisedUrl();
+            String workerId = advertisedUrl.getHost() + ":" + 
advertisedUrl.getPort();
 
-        Worker worker = new Worker(workerId, time, plugins, config, new 
FileOffsetBackingStore());
+            Worker worker = new Worker(workerId, time, plugins, config, new 
FileOffsetBackingStore());
 
-        Herder herder = new StandaloneHerder(worker, kafkaClusterId);
-        final Connect connect = new Connect(herder, rest);
-        log.info("Kafka Connect standalone worker initialization took {}ms", 
time.hiResClockMs() - initStart);
+            Herder herder = new StandaloneHerder(worker, kafkaClusterId);
+            final Connect connect = new Connect(herder, rest);
+            log.info("Kafka Connect standalone worker initialization took 
{}ms", time.hiResClockMs() - initStart);
 
-        try {
-            connect.start();
-            for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, 
args.length)) {
-                Map<String, String> connectorProps = 
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
-                FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
-                    @Override
-                    public void onCompletion(Throwable error, 
Herder.Created<ConnectorInfo> info) {
-                        if (error != null)
-                            log.error("Failed to create job for {}", 
connectorPropsFile);
-                        else
-                            log.info("Created connector {}", 
info.result().name());
-                    }
-                });
-                herder.putConnectorConfig(
-                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
-                        connectorProps, false, cb);
-                cb.get();
+            try {
+                connect.start();
+                for (final String connectorPropsFile : 
Arrays.copyOfRange(args, 1, args.length)) {
+                    Map<String, String> connectorProps = 
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
+                    FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
+                        @Override
+                        public void onCompletion(Throwable error, 
Herder.Created<ConnectorInfo> info) {
+                            if (error != null)
+                                log.error("Failed to create job for {}", 
connectorPropsFile);
+                            else
+                                log.info("Created connector {}", 
info.result().name());
+                        }
+                    });
+                    herder.putConnectorConfig(
+                            connectorProps.get(ConnectorConfig.NAME_CONFIG),
+                            connectorProps, false, cb);
+                    cb.get();
+                }
+            } catch (Throwable t) {
+                log.error("Stopping after connector error", t);
+                connect.stop();
+                Exit.exit(3);
             }
+
+            // Shutdown will be triggered by Ctrl-C or via HTTP shutdown 
request
+            connect.awaitStop();
+
         } catch (Throwable t) {
-            log.error("Stopping after connector error", t);
-            connect.stop();
+            log.error("Stopping due to error", t);
+            Exit.exit(2);
         }
-
-        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
-        connect.awaitStop();
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Connect distributed and standalone worker 'main()' methods should catch and 
> log all exceptions
> ----------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6578
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6578
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.0.0
>            Reporter: Randall Hauch
>            Priority: Critical
>
> Currently, the {{main}} methods in {{ConnectDistributed}} and 
> {{ConnectStandalone}} do not catch and log most of the potential exceptions. 
> That means that when such an exception does occur, Java does terminate the 
> process and report it to stderr, but does not log the exception in the log.
> We should add a try block around most of the existing code in the main method 
> to catch any Throwable exception, log it, and either rethrow it or explicitly 
> exit with a non-zero return code.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to