merlimat closed pull request #1568: moving function worker service to pulsar 
service
URL: https://github.com/apache/incubator-pulsar/pull/1568
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 42d90e271..2a0e1a22e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -225,16 +225,6 @@ public void start() throws Exception {
 
             pulsarService.start();
             log.info("PulsarService started.");
-
-            // after broker is started, start the functions worker
-            if (null != functionsWorkerService) {
-                try {
-                    functionsWorkerService.start();
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    throw ie;
-                }
-            }
         }
 
         public void join() throws InterruptedException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index a5d0a43e4..84b3723ca 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -44,7 +44,6 @@
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.ea.agentloader.AgentLoader;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 public class PulsarStandaloneStarter {
@@ -259,10 +258,6 @@ void start() throws Exception {
             log.info(e.getMessage());
         }
 
-        if (null != fnWorkerService) {
-            fnWorkerService.start();
-        }
-
         log.debug("--- setup completed ---");
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e5623d4c5..0c6d38b56 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -19,13 +19,15 @@
 package org.apache.pulsar.broker;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
-import java.net.URL;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -38,11 +40,18 @@
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
+
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.naming.NamedEntity;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -72,6 +81,7 @@
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.worker.Utils;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
@@ -84,15 +94,21 @@
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.Response;
+
 /**
  * Main class for Pulsar broker service
  */
+
 public class PulsarService implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarService.class);
     private ServiceConfiguration config = null;
@@ -390,6 +406,12 @@ public synchronized void brokerIsAFollowerNow() {
             });
 
             leaderElectionService.start();
+
+            // start function worker service if necessary
+            this.startWorkerService();
+
+            LOG.info("Starting Pulsar Broker service; version: '{}'", ( 
brokerVersion != null ? brokerVersion : "unknown" )  );
+
             webService.start();
 
             this.metricsGenerator = new MetricsGenerator(this);
@@ -785,4 +807,102 @@ public String getBrokerVersion() {
     public SchemaRegistryService getSchemaRegistryService() {
         return schemaRegistryService;
     }
+
+    private void startWorkerService() throws InterruptedException, 
IOException, KeeperException {
+        if (functionWorkerService.isPresent()) {
+            LOG.info("Starting function worker service");
+            String namespace = functionWorkerService.get()
+                    .getWorkerConfig().getPulsarFunctionsNamespace();
+            String[] a = 
functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/");
+            String property = a[0];
+            String cluster = a[1];
+
+                /*
+                multiple brokers may be trying to create the property, 
cluster, and namespace
+                for function worker service this in parallel. The function 
worker service uses the namespace
+                to create topics for internal function
+                */
+
+            // create property for function worker service
+            try {
+                NamedEntity.checkName(property);
+                this.getGlobalZkCache().getZooKeeper().create(
+                        AdminResource.path(POLICIES, property),
+                        ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
+                                new PropertyAdmin(
+                                        
Sets.newHashSet(config.getSuperUserRoles()),
+                                        Sets.newHashSet(cluster))),
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                LOG.info("Created property {} for function worker", property);
+            } catch (KeeperException.NodeExistsException e) {
+                LOG.debug("Failed to create already existing property {} for 
function worker service", cluster, e);
+            } catch (IllegalArgumentException e) {
+                LOG.error("Failed to create property with invalid name {} for 
function worker service", cluster, e);
+                throw e;
+            } catch (Exception e) {
+                LOG.error("Failed to create property {} for function worker", 
cluster, e);
+                throw e;
+            }
+
+            // create cluster for function worker service
+            try {
+                NamedEntity.checkName(cluster);
+                ClusterData clusterData = new 
ClusterData(this.getWebServiceAddress(), null /* serviceUrlTls */,
+                        brokerServiceUrl, null /* brokerServiceUrlTls */);
+                this.getGlobalZkCache().getZooKeeper().create(
+                        AdminResource.path("clusters", cluster),
+                        
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData),
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                LOG.info("Created cluster {} for function worker", cluster);
+            } catch (KeeperException.NodeExistsException e) {
+                LOG.debug("Failed to create already existing cluster {} for 
function worker service", cluster, e);
+            } catch (IllegalArgumentException e) {
+                LOG.error("Failed to create cluster with invalid name {} for 
function worker service", cluster, e);
+                throw e;
+            } catch (Exception e) {
+                LOG.error("Failed to create cluster {} for function worker 
service", cluster, e);
+                throw e;
+            }
+
+            // create namespace for function worker service
+            try {
+                Policies policies = new Policies();
+                int defaultNumberOfBundles = 
this.getConfiguration().getDefaultNumberOfNamespaceBundles();
+                policies.bundles = getBundles(defaultNumberOfBundles);
+
+                
this.getConfigurationCache().policiesCache().invalidate(AdminResource.path(POLICIES,
 namespace));
+                
ZkUtils.createFullPathOptimistic(this.getGlobalZkCache().getZooKeeper(),
+                        AdminResource.path(POLICIES, namespace),
+                        
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+                LOG.info("Created namespace {} for function worker service", 
namespace);
+            } catch (KeeperException.NodeExistsException e) {
+                LOG.debug("Failed to create already existing namespace {} for 
function worker service", namespace);
+            } catch (Exception e) {
+                LOG.error("Failed to create namespace {}", namespace, e);
+                throw e;
+            }
+
+            InternalConfigurationData internalConf = new 
InternalConfigurationData(
+                    this.getConfiguration().getZookeeperServers(),
+                    this.getConfiguration().getGlobalZookeeperServers(),
+                    new ClientConfiguration().getZkLedgersRootPath());
+
+            URI dlogURI;
+            try {
+                // initializing dlog namespace for function worker
+                dlogURI = Utils.initializeDlogNamespace(
+                        internalConf.getZookeeperServers(),
+                        internalConf.getLedgersRootPath());
+            } catch (IOException ioe) {
+                LOG.error("Failed to initialize dlog namespace at zookeeper {} 
for storing function packages",
+                        internalConf.getZookeeperServers(), ioe);
+                throw ioe;
+            }
+            LOG.info("Function worker service setup completed");
+            functionWorkerService.get().start(dlogURI);
+            LOG.info("Function worker service started");
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0fe5fd877..e7502886f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1248,7 +1248,7 @@ protected BundlesData validateBundlesData(BundlesData 
initialBundles) {
         return new BundlesData(bundles);
     }
 
-    protected BundlesData getBundles(int numBundles) {
+    public static BundlesData getBundles(int numBundles) {
         if (numBundles <= 0 || numBundles > MAX_BUNDLES) {
             throw new RestException(Status.BAD_REQUEST,
                     "Invalid number of bundles. Number of numbles has to be in 
the range of (0, 2^32].");
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetadataSetup.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetadataSetup.java
deleted file mode 100644
index 1d606dedc..000000000
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetadataSetup.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker;
-
-import java.io.IOException;
-import java.net.URI;
-import javax.ws.rs.core.Response;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.conf.InternalConfigurationData;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-
-/**
- * Tools to setup function metadata.
- */
-@Slf4j
-public class FunctionMetadataSetup {
-
-    /**
-     * Setup function metadata.
-     *
-     * @param workerConfig worker config
-     * @return dlog uri for store function jars
-     * @throws InterruptedException interrupted at setting up metadata
-     * @throws PulsarAdminException when encountering exception on pulsar 
admin operation
-     * @throws IOException when create dlog namespace for storing function 
jars.
-     */
-    public static URI setupFunctionMetadata(WorkerConfig workerConfig)
-            throws InterruptedException, PulsarAdminException, IOException {
-        // initializing pulsar functions namespace
-        PulsarAdmin admin = 
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl());
-        InternalConfigurationData internalConf;
-        // make sure pulsar broker is up
-        log.info("Checking if pulsar service at {} is up...", 
workerConfig.getPulsarWebServiceUrl());
-        int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries();
-        int retries = 0;
-        while (true) {
-            try {
-                admin.clusters().getClusters();
-                break;
-            } catch (PulsarAdminException e) {
-                log.warn("Failed to retrieve clusters from pulsar service", e);
-                log.warn("Retry to connect to Pulsar service at {}", 
workerConfig.getPulsarWebServiceUrl());
-                if (retries >= maxRetries) {
-                    log.error("Failed to connect to Pulsar service at {} after 
{} attempts",
-                            workerConfig.getPulsarFunctionsNamespace(), 
maxRetries);
-                    throw e;
-                }
-                retries ++;
-                Thread.sleep(1000);
-            }
-        }
-
-        // getting namespace policy
-        log.info("Initializing Pulsar Functions namespace...");
-        try {
-            try {
-                
admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
-            } catch (PulsarAdminException e) {
-                if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
-                    // if not found than create
-                    try {
-                        
admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace());
-                    } catch (PulsarAdminException e1) {
-                        // prevent race condition with other workers starting 
up
-                        if (e1.getStatusCode() != 
Response.Status.CONFLICT.getStatusCode()) {
-                            log.error("Failed to create namespace {} for 
pulsar functions", workerConfig
-                                .getPulsarFunctionsNamespace(), e1);
-                            throw e1;
-                        }
-                    }
-                    try {
-                        admin.namespaces().setRetention(
-                            workerConfig.getPulsarFunctionsNamespace(),
-                            new RetentionPolicies(Integer.MAX_VALUE, 
Integer.MAX_VALUE));
-                    } catch (PulsarAdminException e1) {
-                        log.error("Failed to set retention policy for pulsar 
functions namespace", e);
-                        throw new RuntimeException(e1);
-                    }
-                } else {
-                    log.error("Failed to get retention policy for pulsar 
function namespace {}",
-                        workerConfig.getPulsarFunctionsNamespace(), e);
-                    throw e;
-                }
-            }
-            try {
-                internalConf = admin.brokers().getInternalConfigurationData();
-            } catch (PulsarAdminException e) {
-                log.error("Failed to retrieve broker internal configuration", 
e);
-                throw e;
-            }
-        } finally {
-            admin.close();
-        }
-
-        // initialize the dlog namespace
-        // TODO: move this as part of pulsar cluster initialization later
-        try {
-            return Utils.initializeDlogNamespace(
-                internalConf.getZookeeperServers(),
-                internalConf.getLedgersRootPath());
-        } catch (IOException ioe) {
-            log.error("Failed to initialize dlog namespace at zookeeper {} for 
storing function packages",
-                    internalConf.getZookeeperServers(), ioe);
-            throw ioe;
-        }
-    }
-
-}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 09f9fc0fb..a7fd982a4 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -21,8 +21,16 @@
 import com.google.common.util.concurrent.AbstractService;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.functions.worker.rest.WorkerServer;
 
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.URI;
+
 @Slf4j
 public class Worker extends AbstractService {
 
@@ -47,8 +55,10 @@ protected void doStart() {
         }
     }
 
-    protected void doStartImpl() throws InterruptedException {
-        workerService.start();
+    protected void doStartImpl() throws InterruptedException, IOException, 
PulsarAdminException {
+        URI dlogUri = initialize(this.workerConfig);
+
+        workerService.start(dlogUri);
         WorkerServer server = new WorkerServer(workerService);
         this.serverThread = new Thread(server, server.getThreadName());
 
@@ -56,6 +66,87 @@ protected void doStartImpl() throws InterruptedException {
         this.serverThread.start();
     }
 
+    private static URI initialize(WorkerConfig workerConfig)
+            throws InterruptedException, PulsarAdminException, IOException {
+        // initializing pulsar functions namespace
+        PulsarAdmin admin = 
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl());
+        InternalConfigurationData internalConf;
+        // make sure pulsar broker is up
+        log.info("Checking if pulsar service at {} is up...", 
workerConfig.getPulsarWebServiceUrl());
+        int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries();
+        int retries = 0;
+        while (true) {
+            try {
+                admin.clusters().getClusters();
+                break;
+            } catch (PulsarAdminException e) {
+                log.warn("Failed to retrieve clusters from pulsar service", e);
+                log.warn("Retry to connect to Pulsar service at {}", 
workerConfig.getPulsarWebServiceUrl());
+                if (retries >= maxRetries) {
+                    log.error("Failed to connect to Pulsar service at {} after 
{} attempts",
+                            workerConfig.getPulsarFunctionsNamespace(), 
maxRetries);
+                    throw e;
+                }
+                retries ++;
+                Thread.sleep(1000);
+            }
+        }
+
+        // getting namespace policy
+        log.info("Initializing Pulsar Functions namespace...");
+        try {
+            try {
+                
admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
+            } catch (PulsarAdminException e) {
+                if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
+                    // if not found than create
+                    try {
+                        
admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace());
+                    } catch (PulsarAdminException e1) {
+                        // prevent race condition with other workers starting 
up
+                        if (e1.getStatusCode() != 
Response.Status.CONFLICT.getStatusCode()) {
+                            log.error("Failed to create namespace {} for 
pulsar functions", workerConfig
+                                    .getPulsarFunctionsNamespace(), e1);
+                            throw e1;
+                        }
+                    }
+                    try {
+                        admin.namespaces().setRetention(
+                                workerConfig.getPulsarFunctionsNamespace(),
+                                new RetentionPolicies(Integer.MAX_VALUE, 
Integer.MAX_VALUE));
+                    } catch (PulsarAdminException e1) {
+                        log.error("Failed to set retention policy for pulsar 
functions namespace", e);
+                        throw new RuntimeException(e1);
+                    }
+                } else {
+                    log.error("Failed to get retention policy for pulsar 
function namespace {}",
+                            workerConfig.getPulsarFunctionsNamespace(), e);
+                    throw e;
+                }
+            }
+            try {
+                internalConf = admin.brokers().getInternalConfigurationData();
+            } catch (PulsarAdminException e) {
+                log.error("Failed to retrieve broker internal configuration", 
e);
+                throw e;
+            }
+        } finally {
+            admin.close();
+        }
+
+        // initialize the dlog namespace
+        // TODO: move this as part of pulsar cluster initialization later
+        try {
+            return Utils.initializeDlogNamespace(
+                    internalConf.getZookeeperServers(),
+                    internalConf.getLedgersRootPath());
+        } catch (IOException ioe) {
+            log.error("Failed to initialize dlog namespace at zookeeper {} for 
storing function packages",
+                    internalConf.getZookeeperServers(), ioe);
+            throw ioe;
+        }
+    }
+
     @Override
     protected void doStop() {
         if (null != serverThread) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 20ffd8d94..5df3c3ff0 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -20,22 +20,14 @@
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
 import java.net.URI;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.apache.pulsar.functions.worker.rest.Resources;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.servlet.ServletContainer;
 
 /**
  * A service component contains everything to run a worker except rest server.
@@ -58,15 +50,7 @@ public WorkerService(WorkerConfig workerConfig) {
         this.workerConfig = workerConfig;
     }
 
-    public void start() throws InterruptedException {
-        try {
-            start(FunctionMetadataSetup.setupFunctionMetadata(workerConfig));
-        } catch (PulsarAdminException | IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void start(URI dlogUri) throws InterruptedException {
+    public void start(URI dlogUri) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
         try {
             log.info("Worker Configs: {}", new 
ObjectMapper().writerWithDefaultPrettyPrinter()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to