This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 0e22fd6 KAFKA-6578: Changed the Connect distributed and standalone main method to log all exceptions (#4609) 0e22fd6 is described below commit 0e22fd6f8d49e7884b7126d963af12ab305d5629 Author: Randall Hauch <rha...@gmail.com> AuthorDate: Fri Feb 23 00:29:49 2018 -0600 KAFKA-6578: Changed the Connect distributed and standalone main method to log all exceptions (#4609) Any exception thrown by calls within a `main()` method are not logged unless explicitly done so. This change simply adds a try-catch block around most of the content of the distributed and standalone `main()` methods. --- .../kafka/connect/cli/ConnectDistributed.java | 81 ++++++++++--------- .../kafka/connect/cli/ConnectStandalone.java | 93 ++++++++++++---------- 2 files changed, 94 insertions(+), 80 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java index 3b7ec87..54854fe4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java @@ -58,52 +58,59 @@ public class ConnectDistributed { Exit.exit(1); } - Time time = Time.SYSTEM; - log.info("Kafka Connect distributed worker initializing ..."); - long initStart = time.hiResClockMs(); - WorkerInfo initInfo = new WorkerInfo(); - initInfo.logAll(); + try { + Time time = Time.SYSTEM; + log.info("Kafka Connect distributed worker initializing ..."); + long initStart = time.hiResClockMs(); + WorkerInfo initInfo = new WorkerInfo(); + initInfo.logAll(); - String workerPropsFile = args[0]; - Map<String, String> workerProps = !workerPropsFile.isEmpty() ? - Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap(); + String workerPropsFile = args[0]; + Map<String, String> workerProps = !workerPropsFile.isEmpty() ? + Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap(); - log.info("Scanning for plugin classes. This might take a moment ..."); - Plugins plugins = new Plugins(workerProps); - plugins.compareAndSwapWithDelegatingLoader(); - DistributedConfig config = new DistributedConfig(workerProps); + log.info("Scanning for plugin classes. This might take a moment ..."); + Plugins plugins = new Plugins(workerProps); + plugins.compareAndSwapWithDelegatingLoader(); + DistributedConfig config = new DistributedConfig(workerProps); - String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); - log.debug("Kafka cluster ID: {}", kafkaClusterId); + String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); + log.debug("Kafka cluster ID: {}", kafkaClusterId); - RestServer rest = new RestServer(config); - URI advertisedUrl = rest.advertisedUrl(); - String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); + RestServer rest = new RestServer(config); + URI advertisedUrl = rest.advertisedUrl(); + String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); - KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); - offsetBackingStore.configure(config); + KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore(); + offsetBackingStore.configure(config); - Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore); + Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore); - Converter internalValueConverter = worker.getInternalValueConverter(); - StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); - statusBackingStore.configure(config); + Converter internalValueConverter = worker.getInternalValueConverter(); + StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter); + statusBackingStore.configure(config); - ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config); + ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(internalValueConverter, config); - DistributedHerder herder = new DistributedHerder(config, time, worker, - kafkaClusterId, statusBackingStore, configBackingStore, - advertisedUrl.toString()); - final Connect connect = new Connect(herder, rest); - log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart); - try { - connect.start(); - } catch (Exception e) { - log.error("Failed to start Connect", e); - connect.stop(); - } + DistributedHerder herder = new DistributedHerder(config, time, worker, + kafkaClusterId, statusBackingStore, configBackingStore, + advertisedUrl.toString()); + final Connect connect = new Connect(herder, rest); + log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart); + try { + connect.start(); + } catch (Exception e) { + log.error("Failed to start Connect", e); + connect.stop(); + Exit.exit(3); + } - // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request - connect.awaitStop(); + // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request + connect.awaitStop(); + + } catch (Throwable t) { + log.error("Stopping due to error", t); + Exit.exit(2); + } } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java index 413cb46..aba9d9c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java @@ -62,58 +62,65 @@ public class ConnectStandalone { Exit.exit(1); } - Time time = Time.SYSTEM; - log.info("Kafka Connect standalone worker initializing ..."); - long initStart = time.hiResClockMs(); - WorkerInfo initInfo = new WorkerInfo(); - initInfo.logAll(); + try { + Time time = Time.SYSTEM; + log.info("Kafka Connect standalone worker initializing ..."); + long initStart = time.hiResClockMs(); + WorkerInfo initInfo = new WorkerInfo(); + initInfo.logAll(); - String workerPropsFile = args[0]; - Map<String, String> workerProps = !workerPropsFile.isEmpty() ? - Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap(); + String workerPropsFile = args[0]; + Map<String, String> workerProps = !workerPropsFile.isEmpty() ? + Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap(); - log.info("Scanning for plugin classes. This might take a moment ..."); - Plugins plugins = new Plugins(workerProps); - plugins.compareAndSwapWithDelegatingLoader(); - StandaloneConfig config = new StandaloneConfig(workerProps); + log.info("Scanning for plugin classes. This might take a moment ..."); + Plugins plugins = new Plugins(workerProps); + plugins.compareAndSwapWithDelegatingLoader(); + StandaloneConfig config = new StandaloneConfig(workerProps); - String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); - log.debug("Kafka cluster ID: {}", kafkaClusterId); + String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config); + log.debug("Kafka cluster ID: {}", kafkaClusterId); - RestServer rest = new RestServer(config); - URI advertisedUrl = rest.advertisedUrl(); - String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); + RestServer rest = new RestServer(config); + URI advertisedUrl = rest.advertisedUrl(); + String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort(); - Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore()); + Worker worker = new Worker(workerId, time, plugins, config, new FileOffsetBackingStore()); - Herder herder = new StandaloneHerder(worker, kafkaClusterId); - final Connect connect = new Connect(herder, rest); - log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart); + Herder herder = new StandaloneHerder(worker, kafkaClusterId); + final Connect connect = new Connect(herder, rest); + log.info("Kafka Connect standalone worker initialization took {}ms", time.hiResClockMs() - initStart); - try { - connect.start(); - for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) { - Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile)); - FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() { - @Override - public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) { - if (error != null) - log.error("Failed to create job for {}", connectorPropsFile); - else - log.info("Created connector {}", info.result().name()); - } - }); - herder.putConnectorConfig( - connectorProps.get(ConnectorConfig.NAME_CONFIG), - connectorProps, false, cb); - cb.get(); + try { + connect.start(); + for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) { + Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile)); + FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() { + @Override + public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) { + if (error != null) + log.error("Failed to create job for {}", connectorPropsFile); + else + log.info("Created connector {}", info.result().name()); + } + }); + herder.putConnectorConfig( + connectorProps.get(ConnectorConfig.NAME_CONFIG), + connectorProps, false, cb); + cb.get(); + } + } catch (Throwable t) { + log.error("Stopping after connector error", t); + connect.stop(); + Exit.exit(3); } + + // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request + connect.awaitStop(); + } catch (Throwable t) { - log.error("Stopping after connector error", t); - connect.stop(); + log.error("Stopping due to error", t); + Exit.exit(2); } - - // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request - connect.awaitStop(); } } -- To stop receiving notification emails like this one, please contact j...@apache.org.