This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aba0d42  moving function worker service to pulsar service (#1568)
aba0d42 is described below

commit aba0d4247991c68d8d7bce21eaaeba4cb5cb1fb8
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Fri Apr 13 12:48:38 2018 -0700

    moving function worker service to pulsar service (#1568)
    
    * moving function worker service to pulsar service
    
    * moving function worker service start routine to separate method
---
 .../org/apache/pulsar/PulsarBrokerStarter.java     |  10 --
 .../org/apache/pulsar/PulsarStandaloneStarter.java |   5 -
 .../org/apache/pulsar/broker/PulsarService.java    | 124 +++++++++++++++++++-
 .../pulsar/broker/admin/impl/NamespacesBase.java   |   2 +-
 .../functions/worker/FunctionMetadataSetup.java    | 126 ---------------------
 .../org/apache/pulsar/functions/worker/Worker.java |  95 +++++++++++++++-
 .../pulsar/functions/worker/WorkerService.java     |  18 +--
 7 files changed, 217 insertions(+), 163 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
index 42d90e2..2a0e1a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java
@@ -225,16 +225,6 @@ public class PulsarBrokerStarter {
 
             pulsarService.start();
             log.info("PulsarService started.");
-
-            // after broker is started, start the functions worker
-            if (null != functionsWorkerService) {
-                try {
-                    functionsWorkerService.start();
-                } catch (InterruptedException ie) {
-                    Thread.currentThread().interrupt();
-                    throw ie;
-                }
-            }
         }
 
         public void join() throws InterruptedException {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index a5d0a43..84b3723 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.ea.agentloader.AgentLoader;
-import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
 public class PulsarStandaloneStarter {
@@ -259,10 +258,6 @@ public class PulsarStandaloneStarter {
             log.info(e.getMessage());
         }
 
-        if (null != fnWorkerService) {
-            fnWorkerService.start();
-        }
-
         log.debug("--- setup completed ---");
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index e5623d4..0c6d38b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -19,13 +19,15 @@
 package org.apache.pulsar.broker;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles;
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
-import java.net.URL;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -38,11 +40,18 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Supplier;
+
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.common.util.OrderedExecutor;
-import org.apache.bookkeeper.common.util.OrderedScheduler;
+import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
+import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.naming.NamedEntity;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.compaction.Compactor;
 import org.apache.pulsar.compaction.TwoPhaseCompactor;
 import org.apache.pulsar.broker.admin.AdminResource;
@@ -72,6 +81,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.functions.worker.Utils;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils;
 import org.apache.pulsar.websocket.WebSocketConsumerServlet;
@@ -84,15 +94,21 @@ import 
org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService;
 import org.apache.pulsar.zookeeper.ZooKeeperCache;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.websocket.servlet.WebSocketServlet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.core.Response;
+
 /**
  * Main class for Pulsar broker service
  */
+
 public class PulsarService implements AutoCloseable {
     private static final Logger LOG = 
LoggerFactory.getLogger(PulsarService.class);
     private ServiceConfiguration config = null;
@@ -390,6 +406,12 @@ public class PulsarService implements AutoCloseable {
             });
 
             leaderElectionService.start();
+
+            // start function worker service if necessary
+            this.startWorkerService();
+
+            LOG.info("Starting Pulsar Broker service; version: '{}'", ( 
brokerVersion != null ? brokerVersion : "unknown" )  );
+
             webService.start();
 
             this.metricsGenerator = new MetricsGenerator(this);
@@ -785,4 +807,102 @@ public class PulsarService implements AutoCloseable {
     public SchemaRegistryService getSchemaRegistryService() {
         return schemaRegistryService;
     }
+
+    private void startWorkerService() throws InterruptedException, 
IOException, KeeperException {
+        if (functionWorkerService.isPresent()) {
+            LOG.info("Starting function worker service");
+            String namespace = functionWorkerService.get()
+                    .getWorkerConfig().getPulsarFunctionsNamespace();
+            String[] a = 
functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/");
+            String property = a[0];
+            String cluster = a[1];
+
+                /*
+                multiple brokers may be trying to create the property, 
cluster, and namespace
+                for function worker service this in parallel. The function 
worker service uses the namespace
+                to create topics for internal function
+                */
+
+            // create property for function worker service
+            try {
+                NamedEntity.checkName(property);
+                this.getGlobalZkCache().getZooKeeper().create(
+                        AdminResource.path(POLICIES, property),
+                        ObjectMapperFactory.getThreadLocal().writeValueAsBytes(
+                                new PropertyAdmin(
+                                        
Sets.newHashSet(config.getSuperUserRoles()),
+                                        Sets.newHashSet(cluster))),
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                LOG.info("Created property {} for function worker", property);
+            } catch (KeeperException.NodeExistsException e) {
+                LOG.debug("Failed to create already existing property {} for 
function worker service", cluster, e);
+            } catch (IllegalArgumentException e) {
+                LOG.error("Failed to create property with invalid name {} for 
function worker service", cluster, e);
+                throw e;
+            } catch (Exception e) {
+                LOG.error("Failed to create property {} for function worker", 
cluster, e);
+                throw e;
+            }
+
+            // create cluster for function worker service
+            try {
+                NamedEntity.checkName(cluster);
+                ClusterData clusterData = new 
ClusterData(this.getWebServiceAddress(), null /* serviceUrlTls */,
+                        brokerServiceUrl, null /* brokerServiceUrlTls */);
+                this.getGlobalZkCache().getZooKeeper().create(
+                        AdminResource.path("clusters", cluster),
+                        
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData),
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                LOG.info("Created cluster {} for function worker", cluster);
+            } catch (KeeperException.NodeExistsException e) {
+                LOG.debug("Failed to create already existing cluster {} for 
function worker service", cluster, e);
+            } catch (IllegalArgumentException e) {
+                LOG.error("Failed to create cluster with invalid name {} for 
function worker service", cluster, e);
+                throw e;
+            } catch (Exception e) {
+                LOG.error("Failed to create cluster {} for function worker 
service", cluster, e);
+                throw e;
+            }
+
+            // create namespace for function worker service
+            try {
+                Policies policies = new Policies();
+                int defaultNumberOfBundles = 
this.getConfiguration().getDefaultNumberOfNamespaceBundles();
+                policies.bundles = getBundles(defaultNumberOfBundles);
+
+                
this.getConfigurationCache().policiesCache().invalidate(AdminResource.path(POLICIES,
 namespace));
+                
ZkUtils.createFullPathOptimistic(this.getGlobalZkCache().getZooKeeper(),
+                        AdminResource.path(POLICIES, namespace),
+                        
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+                LOG.info("Created namespace {} for function worker service", 
namespace);
+            } catch (KeeperException.NodeExistsException e) {
+                LOG.debug("Failed to create already existing namespace {} for 
function worker service", namespace);
+            } catch (Exception e) {
+                LOG.error("Failed to create namespace {}", namespace, e);
+                throw e;
+            }
+
+            InternalConfigurationData internalConf = new 
InternalConfigurationData(
+                    this.getConfiguration().getZookeeperServers(),
+                    this.getConfiguration().getGlobalZookeeperServers(),
+                    new ClientConfiguration().getZkLedgersRootPath());
+
+            URI dlogURI;
+            try {
+                // initializing dlog namespace for function worker
+                dlogURI = Utils.initializeDlogNamespace(
+                        internalConf.getZookeeperServers(),
+                        internalConf.getLedgersRootPath());
+            } catch (IOException ioe) {
+                LOG.error("Failed to initialize dlog namespace at zookeeper {} 
for storing function packages",
+                        internalConf.getZookeeperServers(), ioe);
+                throw ioe;
+            }
+            LOG.info("Function worker service setup completed");
+            functionWorkerService.get().start(dlogURI);
+            LOG.info("Function worker service started");
+        }
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 0fe5fd8..e750288 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -1248,7 +1248,7 @@ public abstract class NamespacesBase extends 
AdminResource {
         return new BundlesData(bundles);
     }
 
-    protected BundlesData getBundles(int numBundles) {
+    public static BundlesData getBundles(int numBundles) {
         if (numBundles <= 0 || numBundles > MAX_BUNDLES) {
             throw new RestException(Status.BAD_REQUEST,
                     "Invalid number of bundles. Number of numbles has to be in 
the range of (0, 2^32].");
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetadataSetup.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetadataSetup.java
deleted file mode 100644
index 1d606de..0000000
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetadataSetup.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.worker;
-
-import java.io.IOException;
-import java.net.URI;
-import javax.ws.rs.core.Response;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.common.conf.InternalConfigurationData;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-
-/**
- * Tools to setup function metadata.
- */
-@Slf4j
-public class FunctionMetadataSetup {
-
-    /**
-     * Setup function metadata.
-     *
-     * @param workerConfig worker config
-     * @return dlog uri for store function jars
-     * @throws InterruptedException interrupted at setting up metadata
-     * @throws PulsarAdminException when encountering exception on pulsar 
admin operation
-     * @throws IOException when create dlog namespace for storing function 
jars.
-     */
-    public static URI setupFunctionMetadata(WorkerConfig workerConfig)
-            throws InterruptedException, PulsarAdminException, IOException {
-        // initializing pulsar functions namespace
-        PulsarAdmin admin = 
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl());
-        InternalConfigurationData internalConf;
-        // make sure pulsar broker is up
-        log.info("Checking if pulsar service at {} is up...", 
workerConfig.getPulsarWebServiceUrl());
-        int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries();
-        int retries = 0;
-        while (true) {
-            try {
-                admin.clusters().getClusters();
-                break;
-            } catch (PulsarAdminException e) {
-                log.warn("Failed to retrieve clusters from pulsar service", e);
-                log.warn("Retry to connect to Pulsar service at {}", 
workerConfig.getPulsarWebServiceUrl());
-                if (retries >= maxRetries) {
-                    log.error("Failed to connect to Pulsar service at {} after 
{} attempts",
-                            workerConfig.getPulsarFunctionsNamespace(), 
maxRetries);
-                    throw e;
-                }
-                retries ++;
-                Thread.sleep(1000);
-            }
-        }
-
-        // getting namespace policy
-        log.info("Initializing Pulsar Functions namespace...");
-        try {
-            try {
-                
admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
-            } catch (PulsarAdminException e) {
-                if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
-                    // if not found than create
-                    try {
-                        
admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace());
-                    } catch (PulsarAdminException e1) {
-                        // prevent race condition with other workers starting 
up
-                        if (e1.getStatusCode() != 
Response.Status.CONFLICT.getStatusCode()) {
-                            log.error("Failed to create namespace {} for 
pulsar functions", workerConfig
-                                .getPulsarFunctionsNamespace(), e1);
-                            throw e1;
-                        }
-                    }
-                    try {
-                        admin.namespaces().setRetention(
-                            workerConfig.getPulsarFunctionsNamespace(),
-                            new RetentionPolicies(Integer.MAX_VALUE, 
Integer.MAX_VALUE));
-                    } catch (PulsarAdminException e1) {
-                        log.error("Failed to set retention policy for pulsar 
functions namespace", e);
-                        throw new RuntimeException(e1);
-                    }
-                } else {
-                    log.error("Failed to get retention policy for pulsar 
function namespace {}",
-                        workerConfig.getPulsarFunctionsNamespace(), e);
-                    throw e;
-                }
-            }
-            try {
-                internalConf = admin.brokers().getInternalConfigurationData();
-            } catch (PulsarAdminException e) {
-                log.error("Failed to retrieve broker internal configuration", 
e);
-                throw e;
-            }
-        } finally {
-            admin.close();
-        }
-
-        // initialize the dlog namespace
-        // TODO: move this as part of pulsar cluster initialization later
-        try {
-            return Utils.initializeDlogNamespace(
-                internalConf.getZookeeperServers(),
-                internalConf.getLedgersRootPath());
-        } catch (IOException ioe) {
-            log.error("Failed to initialize dlog namespace at zookeeper {} for 
storing function packages",
-                    internalConf.getZookeeperServers(), ioe);
-            throw ioe;
-        }
-    }
-
-}
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
index 09f9fc0..a7fd982 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java
@@ -21,8 +21,16 @@ package org.apache.pulsar.functions.worker;
 import com.google.common.util.concurrent.AbstractService;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.conf.InternalConfigurationData;
+import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.functions.worker.rest.WorkerServer;
 
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.net.URI;
+
 @Slf4j
 public class Worker extends AbstractService {
 
@@ -47,8 +55,10 @@ public class Worker extends AbstractService {
         }
     }
 
-    protected void doStartImpl() throws InterruptedException {
-        workerService.start();
+    protected void doStartImpl() throws InterruptedException, IOException, 
PulsarAdminException {
+        URI dlogUri = initialize(this.workerConfig);
+
+        workerService.start(dlogUri);
         WorkerServer server = new WorkerServer(workerService);
         this.serverThread = new Thread(server, server.getThreadName());
 
@@ -56,6 +66,87 @@ public class Worker extends AbstractService {
         this.serverThread.start();
     }
 
+    private static URI initialize(WorkerConfig workerConfig)
+            throws InterruptedException, PulsarAdminException, IOException {
+        // initializing pulsar functions namespace
+        PulsarAdmin admin = 
Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl());
+        InternalConfigurationData internalConf;
+        // make sure pulsar broker is up
+        log.info("Checking if pulsar service at {} is up...", 
workerConfig.getPulsarWebServiceUrl());
+        int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries();
+        int retries = 0;
+        while (true) {
+            try {
+                admin.clusters().getClusters();
+                break;
+            } catch (PulsarAdminException e) {
+                log.warn("Failed to retrieve clusters from pulsar service", e);
+                log.warn("Retry to connect to Pulsar service at {}", 
workerConfig.getPulsarWebServiceUrl());
+                if (retries >= maxRetries) {
+                    log.error("Failed to connect to Pulsar service at {} after 
{} attempts",
+                            workerConfig.getPulsarFunctionsNamespace(), 
maxRetries);
+                    throw e;
+                }
+                retries ++;
+                Thread.sleep(1000);
+            }
+        }
+
+        // getting namespace policy
+        log.info("Initializing Pulsar Functions namespace...");
+        try {
+            try {
+                
admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace());
+            } catch (PulsarAdminException e) {
+                if (e.getStatusCode() == 
Response.Status.NOT_FOUND.getStatusCode()) {
+                    // if not found than create
+                    try {
+                        
admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace());
+                    } catch (PulsarAdminException e1) {
+                        // prevent race condition with other workers starting 
up
+                        if (e1.getStatusCode() != 
Response.Status.CONFLICT.getStatusCode()) {
+                            log.error("Failed to create namespace {} for 
pulsar functions", workerConfig
+                                    .getPulsarFunctionsNamespace(), e1);
+                            throw e1;
+                        }
+                    }
+                    try {
+                        admin.namespaces().setRetention(
+                                workerConfig.getPulsarFunctionsNamespace(),
+                                new RetentionPolicies(Integer.MAX_VALUE, 
Integer.MAX_VALUE));
+                    } catch (PulsarAdminException e1) {
+                        log.error("Failed to set retention policy for pulsar 
functions namespace", e);
+                        throw new RuntimeException(e1);
+                    }
+                } else {
+                    log.error("Failed to get retention policy for pulsar 
function namespace {}",
+                            workerConfig.getPulsarFunctionsNamespace(), e);
+                    throw e;
+                }
+            }
+            try {
+                internalConf = admin.brokers().getInternalConfigurationData();
+            } catch (PulsarAdminException e) {
+                log.error("Failed to retrieve broker internal configuration", 
e);
+                throw e;
+            }
+        } finally {
+            admin.close();
+        }
+
+        // initialize the dlog namespace
+        // TODO: move this as part of pulsar cluster initialization later
+        try {
+            return Utils.initializeDlogNamespace(
+                    internalConf.getZookeeperServers(),
+                    internalConf.getLedgersRootPath());
+        } catch (IOException ioe) {
+            log.error("Failed to initialize dlog namespace at zookeeper {} for 
storing function packages",
+                    internalConf.getZookeeperServers(), ioe);
+            throw ioe;
+        }
+    }
+
     @Override
     protected void doStop() {
         if (null != serverThread) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
index 20ffd8d..5df3c3f 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java
@@ -20,22 +20,14 @@ package org.apache.pulsar.functions.worker;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import java.io.IOException;
 import java.net.URI;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.DistributedLogConfiguration;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.distributedlog.api.namespace.NamespaceBuilder;
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.apache.pulsar.functions.worker.rest.Resources;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.glassfish.jersey.server.ResourceConfig;
-import org.glassfish.jersey.servlet.ServletContainer;
 
 /**
  * A service component contains everything to run a worker except rest server.
@@ -58,15 +50,7 @@ public class WorkerService {
         this.workerConfig = workerConfig;
     }
 
-    public void start() throws InterruptedException {
-        try {
-            start(FunctionMetadataSetup.setupFunctionMetadata(workerConfig));
-        } catch (PulsarAdminException | IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private void start(URI dlogUri) throws InterruptedException {
+    public void start(URI dlogUri) throws InterruptedException {
         log.info("Starting worker {}...", workerConfig.getWorkerId());
         try {
             log.info("Worker Configs: {}", new 
ObjectMapper().writerWithDefaultPrettyPrinter()

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to