This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new aba0d42 moving function worker service to pulsar service (#1568) aba0d42 is described below commit aba0d4247991c68d8d7bce21eaaeba4cb5cb1fb8 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Fri Apr 13 12:48:38 2018 -0700 moving function worker service to pulsar service (#1568) * moving function worker service to pulsar service * moving function worker service start routine to separate method --- .../org/apache/pulsar/PulsarBrokerStarter.java | 10 -- .../org/apache/pulsar/PulsarStandaloneStarter.java | 5 - .../org/apache/pulsar/broker/PulsarService.java | 124 +++++++++++++++++++- .../pulsar/broker/admin/impl/NamespacesBase.java | 2 +- .../functions/worker/FunctionMetadataSetup.java | 126 --------------------- .../org/apache/pulsar/functions/worker/Worker.java | 95 +++++++++++++++- .../pulsar/functions/worker/WorkerService.java | 18 +-- 7 files changed, 217 insertions(+), 163 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java index 42d90e2..2a0e1a2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarBrokerStarter.java @@ -225,16 +225,6 @@ public class PulsarBrokerStarter { pulsarService.start(); log.info("PulsarService started."); - - // after broker is started, start the functions worker - if (null != functionsWorkerService) { - try { - functionsWorkerService.start(); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw ie; - } - } } public void join() throws InterruptedException { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java index a5d0a43..84b3723 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java @@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory; import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import com.ea.agentloader.AgentLoader; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; public class PulsarStandaloneStarter { @@ -259,10 +258,6 @@ public class PulsarStandaloneStarter { log.info(e.getMessage()); } - if (null != fnWorkerService) { - fnWorkerService.start(); - } - log.debug("--- setup completed ---"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index e5623d4..0c6d38b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -19,13 +19,15 @@ package org.apache.pulsar.broker; import static org.apache.commons.lang3.StringUtils.isNotBlank; +import static org.apache.pulsar.broker.admin.impl.NamespacesBase.getBundles; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Sets; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.IOException; -import java.net.URL; +import java.net.URI; import java.util.List; import java.util.Map; import java.util.Optional; @@ -38,11 +40,18 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; + import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedExecutor; -import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.builder.ReflectionToStringBuilder; +import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.naming.NamedEntity; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PropertyAdmin; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.compaction.Compactor; import org.apache.pulsar.compaction.TwoPhaseCompactor; import org.apache.pulsar.broker.admin.AdminResource; @@ -72,6 +81,7 @@ import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.functions.worker.Utils; import org.apache.pulsar.functions.worker.WorkerService; import org.apache.pulsar.utils.PulsarBrokerVersionStringUtils; import org.apache.pulsar.websocket.WebSocketConsumerServlet; @@ -84,15 +94,21 @@ import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService; import org.apache.pulsar.zookeeper.ZooKeeperCache; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.websocket.servlet.WebSocketServlet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.ws.rs.core.Response; + /** * Main class for Pulsar broker service */ + public class PulsarService implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class); private ServiceConfiguration config = null; @@ -390,6 +406,12 @@ public class PulsarService implements AutoCloseable { }); leaderElectionService.start(); + + // start function worker service if necessary + this.startWorkerService(); + + LOG.info("Starting Pulsar Broker service; version: '{}'", ( brokerVersion != null ? brokerVersion : "unknown" ) ); + webService.start(); this.metricsGenerator = new MetricsGenerator(this); @@ -785,4 +807,102 @@ public class PulsarService implements AutoCloseable { public SchemaRegistryService getSchemaRegistryService() { return schemaRegistryService; } + + private void startWorkerService() throws InterruptedException, IOException, KeeperException { + if (functionWorkerService.isPresent()) { + LOG.info("Starting function worker service"); + String namespace = functionWorkerService.get() + .getWorkerConfig().getPulsarFunctionsNamespace(); + String[] a = functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/"); + String property = a[0]; + String cluster = a[1]; + + /* + multiple brokers may be trying to create the property, cluster, and namespace + for function worker service this in parallel. The function worker service uses the namespace + to create topics for internal function + */ + + // create property for function worker service + try { + NamedEntity.checkName(property); + this.getGlobalZkCache().getZooKeeper().create( + AdminResource.path(POLICIES, property), + ObjectMapperFactory.getThreadLocal().writeValueAsBytes( + new PropertyAdmin( + Sets.newHashSet(config.getSuperUserRoles()), + Sets.newHashSet(cluster))), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + LOG.info("Created property {} for function worker", property); + } catch (KeeperException.NodeExistsException e) { + LOG.debug("Failed to create already existing property {} for function worker service", cluster, e); + } catch (IllegalArgumentException e) { + LOG.error("Failed to create property with invalid name {} for function worker service", cluster, e); + throw e; + } catch (Exception e) { + LOG.error("Failed to create property {} for function worker", cluster, e); + throw e; + } + + // create cluster for function worker service + try { + NamedEntity.checkName(cluster); + ClusterData clusterData = new ClusterData(this.getWebServiceAddress(), null /* serviceUrlTls */, + brokerServiceUrl, null /* brokerServiceUrlTls */); + this.getGlobalZkCache().getZooKeeper().create( + AdminResource.path("clusters", cluster), + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(clusterData), + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + LOG.info("Created cluster {} for function worker", cluster); + } catch (KeeperException.NodeExistsException e) { + LOG.debug("Failed to create already existing cluster {} for function worker service", cluster, e); + } catch (IllegalArgumentException e) { + LOG.error("Failed to create cluster with invalid name {} for function worker service", cluster, e); + throw e; + } catch (Exception e) { + LOG.error("Failed to create cluster {} for function worker service", cluster, e); + throw e; + } + + // create namespace for function worker service + try { + Policies policies = new Policies(); + int defaultNumberOfBundles = this.getConfiguration().getDefaultNumberOfNamespaceBundles(); + policies.bundles = getBundles(defaultNumberOfBundles); + + this.getConfigurationCache().policiesCache().invalidate(AdminResource.path(POLICIES, namespace)); + ZkUtils.createFullPathOptimistic(this.getGlobalZkCache().getZooKeeper(), + AdminResource.path(POLICIES, namespace), + ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies), + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + LOG.info("Created namespace {} for function worker service", namespace); + } catch (KeeperException.NodeExistsException e) { + LOG.debug("Failed to create already existing namespace {} for function worker service", namespace); + } catch (Exception e) { + LOG.error("Failed to create namespace {}", namespace, e); + throw e; + } + + InternalConfigurationData internalConf = new InternalConfigurationData( + this.getConfiguration().getZookeeperServers(), + this.getConfiguration().getGlobalZookeeperServers(), + new ClientConfiguration().getZkLedgersRootPath()); + + URI dlogURI; + try { + // initializing dlog namespace for function worker + dlogURI = Utils.initializeDlogNamespace( + internalConf.getZookeeperServers(), + internalConf.getLedgersRootPath()); + } catch (IOException ioe) { + LOG.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", + internalConf.getZookeeperServers(), ioe); + throw ioe; + } + LOG.info("Function worker service setup completed"); + functionWorkerService.get().start(dlogURI); + LOG.info("Function worker service started"); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 0fe5fd8..e750288 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1248,7 +1248,7 @@ public abstract class NamespacesBase extends AdminResource { return new BundlesData(bundles); } - protected BundlesData getBundles(int numBundles) { + public static BundlesData getBundles(int numBundles) { if (numBundles <= 0 || numBundles > MAX_BUNDLES) { throw new RestException(Status.BAD_REQUEST, "Invalid number of bundles. Number of numbles has to be in the range of (0, 2^32]."); diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetadataSetup.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetadataSetup.java deleted file mode 100644 index 1d606de..0000000 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetadataSetup.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.worker; - -import java.io.IOException; -import java.net.URI; -import javax.ws.rs.core.Response; -import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.admin.PulsarAdmin; -import org.apache.pulsar.client.admin.PulsarAdminException; -import org.apache.pulsar.common.conf.InternalConfigurationData; -import org.apache.pulsar.common.policies.data.RetentionPolicies; - -/** - * Tools to setup function metadata. - */ -@Slf4j -public class FunctionMetadataSetup { - - /** - * Setup function metadata. - * - * @param workerConfig worker config - * @return dlog uri for store function jars - * @throws InterruptedException interrupted at setting up metadata - * @throws PulsarAdminException when encountering exception on pulsar admin operation - * @throws IOException when create dlog namespace for storing function jars. - */ - public static URI setupFunctionMetadata(WorkerConfig workerConfig) - throws InterruptedException, PulsarAdminException, IOException { - // initializing pulsar functions namespace - PulsarAdmin admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl()); - InternalConfigurationData internalConf; - // make sure pulsar broker is up - log.info("Checking if pulsar service at {} is up...", workerConfig.getPulsarWebServiceUrl()); - int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries(); - int retries = 0; - while (true) { - try { - admin.clusters().getClusters(); - break; - } catch (PulsarAdminException e) { - log.warn("Failed to retrieve clusters from pulsar service", e); - log.warn("Retry to connect to Pulsar service at {}", workerConfig.getPulsarWebServiceUrl()); - if (retries >= maxRetries) { - log.error("Failed to connect to Pulsar service at {} after {} attempts", - workerConfig.getPulsarFunctionsNamespace(), maxRetries); - throw e; - } - retries ++; - Thread.sleep(1000); - } - } - - // getting namespace policy - log.info("Initializing Pulsar Functions namespace..."); - try { - try { - admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace()); - } catch (PulsarAdminException e) { - if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { - // if not found than create - try { - admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace()); - } catch (PulsarAdminException e1) { - // prevent race condition with other workers starting up - if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { - log.error("Failed to create namespace {} for pulsar functions", workerConfig - .getPulsarFunctionsNamespace(), e1); - throw e1; - } - } - try { - admin.namespaces().setRetention( - workerConfig.getPulsarFunctionsNamespace(), - new RetentionPolicies(Integer.MAX_VALUE, Integer.MAX_VALUE)); - } catch (PulsarAdminException e1) { - log.error("Failed to set retention policy for pulsar functions namespace", e); - throw new RuntimeException(e1); - } - } else { - log.error("Failed to get retention policy for pulsar function namespace {}", - workerConfig.getPulsarFunctionsNamespace(), e); - throw e; - } - } - try { - internalConf = admin.brokers().getInternalConfigurationData(); - } catch (PulsarAdminException e) { - log.error("Failed to retrieve broker internal configuration", e); - throw e; - } - } finally { - admin.close(); - } - - // initialize the dlog namespace - // TODO: move this as part of pulsar cluster initialization later - try { - return Utils.initializeDlogNamespace( - internalConf.getZookeeperServers(), - internalConf.getLedgersRootPath()); - } catch (IOException ioe) { - log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", - internalConf.getZookeeperServers(), ioe); - throw ioe; - } - } - -} diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java index 09f9fc0..a7fd982 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Worker.java @@ -21,8 +21,16 @@ package org.apache.pulsar.functions.worker; import com.google.common.util.concurrent.AbstractService; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.common.conf.InternalConfigurationData; +import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.functions.worker.rest.WorkerServer; +import javax.ws.rs.core.Response; +import java.io.IOException; +import java.net.URI; + @Slf4j public class Worker extends AbstractService { @@ -47,8 +55,10 @@ public class Worker extends AbstractService { } } - protected void doStartImpl() throws InterruptedException { - workerService.start(); + protected void doStartImpl() throws InterruptedException, IOException, PulsarAdminException { + URI dlogUri = initialize(this.workerConfig); + + workerService.start(dlogUri); WorkerServer server = new WorkerServer(workerService); this.serverThread = new Thread(server, server.getThreadName()); @@ -56,6 +66,87 @@ public class Worker extends AbstractService { this.serverThread.start(); } + private static URI initialize(WorkerConfig workerConfig) + throws InterruptedException, PulsarAdminException, IOException { + // initializing pulsar functions namespace + PulsarAdmin admin = Utils.getPulsarAdminClient(workerConfig.getPulsarWebServiceUrl()); + InternalConfigurationData internalConf; + // make sure pulsar broker is up + log.info("Checking if pulsar service at {} is up...", workerConfig.getPulsarWebServiceUrl()); + int maxRetries = workerConfig.getInitialBrokerReconnectMaxRetries(); + int retries = 0; + while (true) { + try { + admin.clusters().getClusters(); + break; + } catch (PulsarAdminException e) { + log.warn("Failed to retrieve clusters from pulsar service", e); + log.warn("Retry to connect to Pulsar service at {}", workerConfig.getPulsarWebServiceUrl()); + if (retries >= maxRetries) { + log.error("Failed to connect to Pulsar service at {} after {} attempts", + workerConfig.getPulsarFunctionsNamespace(), maxRetries); + throw e; + } + retries ++; + Thread.sleep(1000); + } + } + + // getting namespace policy + log.info("Initializing Pulsar Functions namespace..."); + try { + try { + admin.namespaces().getPolicies(workerConfig.getPulsarFunctionsNamespace()); + } catch (PulsarAdminException e) { + if (e.getStatusCode() == Response.Status.NOT_FOUND.getStatusCode()) { + // if not found than create + try { + admin.namespaces().createNamespace(workerConfig.getPulsarFunctionsNamespace()); + } catch (PulsarAdminException e1) { + // prevent race condition with other workers starting up + if (e1.getStatusCode() != Response.Status.CONFLICT.getStatusCode()) { + log.error("Failed to create namespace {} for pulsar functions", workerConfig + .getPulsarFunctionsNamespace(), e1); + throw e1; + } + } + try { + admin.namespaces().setRetention( + workerConfig.getPulsarFunctionsNamespace(), + new RetentionPolicies(Integer.MAX_VALUE, Integer.MAX_VALUE)); + } catch (PulsarAdminException e1) { + log.error("Failed to set retention policy for pulsar functions namespace", e); + throw new RuntimeException(e1); + } + } else { + log.error("Failed to get retention policy for pulsar function namespace {}", + workerConfig.getPulsarFunctionsNamespace(), e); + throw e; + } + } + try { + internalConf = admin.brokers().getInternalConfigurationData(); + } catch (PulsarAdminException e) { + log.error("Failed to retrieve broker internal configuration", e); + throw e; + } + } finally { + admin.close(); + } + + // initialize the dlog namespace + // TODO: move this as part of pulsar cluster initialization later + try { + return Utils.initializeDlogNamespace( + internalConf.getZookeeperServers(), + internalConf.getLedgersRootPath()); + } catch (IOException ioe) { + log.error("Failed to initialize dlog namespace at zookeeper {} for storing function packages", + internalConf.getZookeeperServers(), ioe); + throw ioe; + } + } + @Override protected void doStop() { if (null != serverThread) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 20ffd8d..5df3c3f 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -20,22 +20,14 @@ package org.apache.pulsar.functions.worker; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; import java.net.URI; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.distributedlog.DistributedLogConfiguration; import org.apache.distributedlog.api.namespace.Namespace; import org.apache.distributedlog.api.namespace.NamespaceBuilder; -import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.functions.worker.rest.FunctionApiResource; -import org.apache.pulsar.functions.worker.rest.Resources; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.glassfish.jersey.server.ResourceConfig; -import org.glassfish.jersey.servlet.ServletContainer; /** * A service component contains everything to run a worker except rest server. @@ -58,15 +50,7 @@ public class WorkerService { this.workerConfig = workerConfig; } - public void start() throws InterruptedException { - try { - start(FunctionMetadataSetup.setupFunctionMetadata(workerConfig)); - } catch (PulsarAdminException | IOException e) { - throw new RuntimeException(e); - } - } - - private void start(URI dlogUri) throws InterruptedException { + public void start(URI dlogUri) throws InterruptedException { log.info("Starting worker {}...", workerConfig.getWorkerId()); try { log.info("Worker Configs: {}", new ObjectMapper().writerWithDefaultPrettyPrinter() -- To stop receiving notification emails like this one, please contact mme...@apache.org.