This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit af654c419d7c75028c1045dccaee1dcc4e2c26bc Author: Boyang Jerry Peng <[email protected]> AuthorDate: Tue Jul 14 14:02:29 2020 -0700 Fix: function BC issue introduced in 2.6 (#7528) Co-authored-by: Jerry Peng <[email protected]> (cherry picked from commit eef63deeeee897d67b2f3a781c4bbf8dbbe17683) --- .../pulsar/functions/worker/FunctionWorkerStarter.java | 7 ++++--- .../org/apache/pulsar/functions/worker/WorkerUtils.java | 14 +++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java index 90ee762..e33787f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionWorkerStarter.java @@ -51,7 +51,7 @@ public class FunctionWorkerStarter { if (workerArguments.help) { commander.usage(); - System.exit(-1); + System.exit(1); return; } @@ -66,11 +66,12 @@ public class FunctionWorkerStarter { try { worker.start(); } catch (Throwable th) { + log.error("Encountered error in function worker.", th); worker.stop(); - System.exit(-1); + Runtime.getRuntime().halt(1); } Runtime.getRuntime().addShutdownHook(new Thread(() -> { - log.info("Stopping function worker service .."); + log.info("Stopping function worker service..."); worker.stop(); })); } diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java index 5cad320..fc4d5f8 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerUtils.java @@ -154,9 +154,17 @@ public final class WorkerUtils { public static URI initializeDlogNamespace(InternalConfigurationData internalConf) throws IOException { String zookeeperServers = internalConf.getZookeeperServers(); - URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri()); - String ledgersStoreServers = metadataServiceUri.getAuthority().replace(";", ","); - String ledgersRootPath = metadataServiceUri.getPath(); + String ledgersRootPath; + String ledgersStoreServers; + // for BC purposes + if (internalConf.getBookkeeperMetadataServiceUri() == null) { + ledgersRootPath = internalConf.getLedgersRootPath(); + ledgersStoreServers = zookeeperServers; + } else { + URI metadataServiceUri = URI.create(internalConf.getBookkeeperMetadataServiceUri()); + ledgersStoreServers = metadataServiceUri.getAuthority().replace(";", ","); + ledgersRootPath = metadataServiceUri.getPath(); + } BKDLConfig dlConfig = new BKDLConfig(ledgersStoreServers, ledgersRootPath); DLMetadata dlMetadata = DLMetadata.create(dlConfig); URI dlogUri = URI.create(String.format("distributedlog://%s/pulsar/functions", zookeeperServers));
