merlimat closed pull request #1655: Pulsar Functions CLI fixes
URL: https://github.com/apache/incubator-pulsar/pull/1655
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index bf8ce6c058..682413cab2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -77,7 +77,6 @@ public boolean run(String[] args) {
                 System.err.println("Reason: " + e.getMessage());
                 return false;
             } catch (Exception e) {
-                System.err.println("Got exception: " + e.getMessage());
                 e.printStackTrace();
                 return false;
             }
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c1446edce2..1369a6938a 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -20,6 +20,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
 import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 import java.io.File;
@@ -27,10 +28,12 @@
 import java.lang.reflect.Type;
 import java.net.MalformedURLException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.IntStream;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -40,6 +43,7 @@
 import org.apache.bookkeeper.clients.StorageClientBuilder;
 import org.apache.bookkeeper.clients.config.StorageClientSettings;
 import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -76,6 +80,7 @@
 @Slf4j
 @Parameters(commandDescription = "Interface for managing Pulsar Functions 
(lightweight, Lambda-style compute processes that work with Pulsar)")
 public class CmdFunctions extends CmdBase {
+    private static final String DEFAULT_SERVICE_URL = 
"pulsar://localhost:6650";
 
     private final LocalRunner localRunner;
     private final CreateFunction creater;
@@ -239,14 +244,20 @@ void processArguments() throws Exception {
             }
 
             if (null != inputs) {
-                
Arrays.asList(inputs.split(",")).forEach(functionConfig.getInputs()::add);
+                List<String> inputTopics = Arrays.asList(inputs.split(","));
+                inputTopics.forEach(this::validateTopicName);
+                functionConfig.setInputs(inputTopics);
             }
             if (null != customSerdeInputString) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, String> customSerdeInputMap = new 
Gson().fromJson(customSerdeInputString, type);
+                customSerdeInputMap.forEach((topic, serde) -> {
+                    validateTopicName(topic);
+                });
                 functionConfig.setCustomSerdeInputs(customSerdeInputMap);
             }
             if (null != output) {
+                validateTopicName(output);
                 functionConfig.setOutput(output);
             }
             if (null != logTopic) {
@@ -281,10 +292,13 @@ void processArguments() throws Exception {
                 throw new RuntimeException("Either a Java jar or a Python file 
needs to be specified for the function");
             }
 
-            if (functionConfig.getInputs().size() == 0 && 
functionConfig.getCustomSerdeInputs().size() == 0) {
+            if (functionConfig.getInputs().isEmpty() && 
functionConfig.getCustomSerdeInputs().isEmpty()) {
                 throw new RuntimeException("No input topic(s) specified for 
the function");
             }
 
+            // Ensure that topics aren't being used as both input and output
+            verifyNoTopicClash(functionConfig.getInputs(), 
functionConfig.getOutput());;
+
             if (parallelism == null) {
                 if (functionConfig.getParallelism() == 0) {
                     functionConfig.setParallelism(1);
@@ -301,7 +315,7 @@ void processArguments() throws Exception {
                     && functionConfig.getSubscriptionType() != 
FunctionConfig.SubscriptionType.FAILOVER
                     && functionConfig.getProcessingGuarantees() != null
                     && functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                throw new IllegalArgumentException("Effectively Once can only 
be acheived with Failover subscription");
+                throw new IllegalArgumentException("Effectively-once 
processing semantics can only be achieved using a Failover subscription type");
             }
 
             functionConfig.setAutoAck(true);
@@ -309,6 +323,10 @@ void processArguments() throws Exception {
         }
 
         private void doJavaSubmitChecks(FunctionConfig functionConfig) {
+            if (isNull(className)) {
+                throw new IllegalArgumentException("You supplied a jar file 
but no main class");
+            }
+
             File file = new File(jarFile);
             // check if the function class exists in Jar and it implements 
Function class
             if (!Reflections.classExistsInJar(file, 
functionConfig.getClassName())) {
@@ -431,22 +449,32 @@ private void doJavaSubmitChecks(FunctionConfig 
functionConfig) {
         }
 
         private void doPythonSubmitChecks(FunctionConfig functionConfig) {
+            if (functionConfig.getClassName() == null) {
+                throw new IllegalArgumentException("You specified a Python 
file but no main class name");
+            }
+
             if (functionConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                 throw new RuntimeException("Effectively-once processing 
guarantees not yet supported in Python");
             }
         }
 
+        private void validateTopicName(String topic) {
+            if (!TopicName.isValid(topic)) {
+                throw new IllegalArgumentException(String.format("The topic 
name %s is invalid", topic));
+            }
+        }
+
         private void inferMissingArguments(FunctionConfig functionConfig) {
-            if (functionConfig.getName() == null || 
functionConfig.getName().isEmpty()) {
+            if (StringUtils.isEmpty(functionConfig.getName())) {
                 inferMissingFunctionName(functionConfig);
             }
-            if (functionConfig.getTenant() == null || 
functionConfig.getTenant().isEmpty()) {
+            if (StringUtils.isEmpty(functionConfig.getTenant())) {
                 inferMissingTenant(functionConfig);
             }
-            if (functionConfig.getNamespace() == null || 
functionConfig.getNamespace().isEmpty()) {
+            if (StringUtils.isEmpty(functionConfig.getNamespace())) {
                 inferMissingNamespace(functionConfig);
             }
-            if (functionConfig.getOutput() == null || 
functionConfig.getOutput().isEmpty()) {
+            if (StringUtils.isEmpty(functionConfig.getOutput())) {
                 inferMissingOutput(functionConfig);
             }
         }
@@ -481,7 +509,8 @@ private void inferMissingNamespace(FunctionConfig 
functionConfig) {
         private void inferMissingOutput(FunctionConfig functionConfig) {
             try {
                 String inputTopic = getUniqueInput(functionConfig);
-                functionConfig.setOutput(inputTopic + "-" + 
functionConfig.getName() + "-output");
+                String outputTopic = String.format("%s-%s-output", inputTopic, 
functionConfig.getName());
+                functionConfig.setOutput(outputTopic);
             } catch (IllegalArgumentException ex) {
                 // It might be that we really don't need an output topic
                 // So we cannot really throw an exception
@@ -512,16 +541,14 @@ private String getUniqueInput(FunctionConfig 
functionConfig) {
 
         @Override
         void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresent(functionConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
+            checkRequiredFields(functionConfig);
 
             String serviceUrl = admin.getServiceUrl();
             if (brokerServiceUrl != null) {
                 serviceUrl = brokerServiceUrl;
             }
             if (serviceUrl == null) {
-                serviceUrl = "pulsar://localhost:6650";
+                serviceUrl = DEFAULT_SERVICE_URL;
             }
             try (ProcessRuntimeFactory containerFactory = new 
ProcessRuntimeFactory(
                     serviceUrl, null, null, null)) {
@@ -564,9 +591,7 @@ public void run() {
     class CreateFunction extends FunctionDetailsCommand {
         @Override
         void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresent(functionConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
+            checkRequiredFields(functionConfig);
             admin.functions().createFunction(convert(functionConfig), 
userCodeFile);
             print("Created successfully");
         }
@@ -605,9 +630,7 @@ void runCmd() throws Exception {
     class UpdateFunction extends FunctionDetailsCommand {
         @Override
         void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresent(functionConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
+            checkRequiredFields(functionConfig);
             admin.functions().updateFunction(convert(functionConfig), 
userCodeFile);
             print("Updated successfully");
         }
@@ -819,11 +842,34 @@ private static FunctionConfig loadConfig(File file) 
throws IOException {
         return  mapper.readValue(file, FunctionConfig.class);
     }
 
-    public static boolean areAllRequiredFieldsPresent(FunctionConfig 
functionConfig) {
-        return functionConfig.getTenant() != null && 
functionConfig.getNamespace() != null
-                && functionConfig.getName() != null && 
functionConfig.getClassName() != null
-                && (functionConfig.getInputs().size() > 0 || 
functionConfig.getCustomSerdeInputs().size() > 0)
-                && functionConfig.getParallelism() > 0;
+    private static void verifyNoTopicClash(Collection<String> inputTopics, 
String outputTopic) throws IllegalArgumentException {
+        if (inputTopics.contains(outputTopic)) {
+            throw new IllegalArgumentException(
+                    String.format("Output topic %s is also being used as an 
input topic (topics must be one or the other)",
+                            outputTopic));
+        }
+    }
+
+    private static void checkRequiredFields(FunctionConfig config) throws 
IllegalArgumentException {
+        if (isNull(config.getTenant())) {
+            throw new IllegalArgumentException("You must specify a tenant for 
the function");
+        }
+
+        if (isNull(config.getNamespace())) {
+            throw new IllegalArgumentException("You must specify a namespace 
for the function");
+        }
+
+        if (isNull(config.getName())) {
+            throw new IllegalArgumentException("You must specify a name for 
the function");
+        }
+
+        if (isNull(config.getClassName())) {
+            throw new IllegalArgumentException("You must specify a class name 
for the function");
+        }
+
+        if (config.getInputs().isEmpty() && 
config.getCustomSerdeInputs().isEmpty()) {
+            throw new IllegalArgumentException("You must specify one or more 
input topics for the function");
+        }
     }
     
     private org.apache.pulsar.functions.proto.Function.FunctionDetails 
convertProto2(FunctionConfig functionConfig)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to