srkukarni closed pull request #2472: Create function assignment topic during 
standalone mode
URL: https://github.com/apache/incubator-pulsar/pull/2472
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index ca5ac5b968..56932464f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -890,6 +890,8 @@ private void startWorkerService() throws 
InterruptedException, IOException, Keep
             LOG.info("Starting function worker service");
             String namespace = functionWorkerService.get()
                     .getWorkerConfig().getPulsarFunctionsNamespace();
+            String assignmentNamespace = functionWorkerService.get()
+                    .getWorkerConfig().getPulsarAssignmentNamespace();
             String[] a = 
functionWorkerService.get().getWorkerConfig().getPulsarFunctionsNamespace().split("/");
             String property = a[0];
             String cluster = 
functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster();
@@ -963,6 +965,27 @@ private void startWorkerService() throws 
InterruptedException, IOException, Keep
                 throw e;
             }
 
+            // create namespace for function assignment worker service
+            try {
+                Policies policies = new Policies();
+                policies.replication_clusters = 
Collections.singleton(functionWorkerService.get().getWorkerConfig().getPulsarFunctionsCluster());
+                int defaultNumberOfBundles = 
this.getConfiguration().getDefaultNumberOfNamespaceBundles();
+                policies.bundles = getBundles(defaultNumberOfBundles);
+
+                
this.getConfigurationCache().policiesCache().invalidate(AdminResource.path(POLICIES,
 assignmentNamespace));
+                
ZkUtils.createFullPathOptimistic(this.getGlobalZkCache().getZooKeeper(),
+                        AdminResource.path(POLICIES, assignmentNamespace),
+                        
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies),
+                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                        CreateMode.PERSISTENT);
+                LOG.info("Created namespace {} for function worker service", 
assignmentNamespace);
+            } catch (KeeperException.NodeExistsException e) {
+                LOG.debug("Failed to create already existing namespace {} for 
function worker service", assignmentNamespace);
+            } catch (Exception e) {
+                LOG.error("Failed to create namespace {}", 
assignmentNamespace, e);
+                throw e;
+            }
+
             InternalConfigurationData internalConf = new 
InternalConfigurationData(
                     this.getConfiguration().getZookeeperServers(),
                     this.getConfiguration().getConfigurationStoreServers(),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to