This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7484664 Pulsar Functions CLI fixes (#1655)
7484664 is described below
commit 74846643303385cce847120b86c8342e50950492
Author: Luc Perkins <[email protected]>
AuthorDate: Thu Apr 26 16:45:36 2018 -0700
Pulsar Functions CLI fixes (#1655)
* add new argument checker function
* fix misspelling in variable name
* add check for class name to doJavaSubmitChecks
* add default service URL as constant
* fix input topic checking logic
* remove 'got exception' line from Exception printing
* add check for Python class name
* add basic topic name validation
* fix misspelling in getClassname call
* use formatted string for inferred output topic name
* ensure topics aren't used as both input and output
---
.../java/org/apache/pulsar/admin/cli/CmdBase.java | 1 -
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 92 ++++++++++++++++------
2 files changed, 69 insertions(+), 24 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index bf8ce6c..682413c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -77,7 +77,6 @@ public abstract class CmdBase {
System.err.println("Reason: " + e.getMessage());
return false;
} catch (Exception e) {
- System.err.println("Got exception: " + e.getMessage());
e.printStackTrace();
return false;
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c1446ed..1369a69 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.admin.cli;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import java.io.File;
@@ -27,10 +28,12 @@ import java.io.IOException;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.IntStream;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -40,6 +43,7 @@ import org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -76,6 +80,7 @@ import net.jodah.typetools.TypeResolver;
@Slf4j
@Parameters(commandDescription = "Interface for managing Pulsar Functions
(lightweight, Lambda-style compute processes that work with Pulsar)")
public class CmdFunctions extends CmdBase {
+ private static final String DEFAULT_SERVICE_URL =
"pulsar://localhost:6650";
private final LocalRunner localRunner;
private final CreateFunction creater;
@@ -239,14 +244,20 @@ public class CmdFunctions extends CmdBase {
}
if (null != inputs) {
-
Arrays.asList(inputs.split(",")).forEach(functionConfig.getInputs()::add);
+ List<String> inputTopics = Arrays.asList(inputs.split(","));
+ inputTopics.forEach(this::validateTopicName);
+ functionConfig.setInputs(inputTopics);
}
if (null != customSerdeInputString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, String> customSerdeInputMap = new
Gson().fromJson(customSerdeInputString, type);
+ customSerdeInputMap.forEach((topic, serde) -> {
+ validateTopicName(topic);
+ });
functionConfig.setCustomSerdeInputs(customSerdeInputMap);
}
if (null != output) {
+ validateTopicName(output);
functionConfig.setOutput(output);
}
if (null != logTopic) {
@@ -281,10 +292,13 @@ public class CmdFunctions extends CmdBase {
throw new RuntimeException("Either a Java jar or a Python file
needs to be specified for the function");
}
- if (functionConfig.getInputs().size() == 0 &&
functionConfig.getCustomSerdeInputs().size() == 0) {
+ if (functionConfig.getInputs().isEmpty() &&
functionConfig.getCustomSerdeInputs().isEmpty()) {
throw new RuntimeException("No input topic(s) specified for
the function");
}
+ // Ensure that topics aren't being used as both input and output
+ verifyNoTopicClash(functionConfig.getInputs(),
functionConfig.getOutput());;
+
if (parallelism == null) {
if (functionConfig.getParallelism() == 0) {
functionConfig.setParallelism(1);
@@ -301,7 +315,7 @@ public class CmdFunctions extends CmdBase {
&& functionConfig.getSubscriptionType() !=
FunctionConfig.SubscriptionType.FAILOVER
&& functionConfig.getProcessingGuarantees() != null
&& functionConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
- throw new IllegalArgumentException("Effectively Once can only
be acheived with Failover subscription");
+ throw new IllegalArgumentException("Effectively-once
processing semantics can only be achieved using a Failover subscription type");
}
functionConfig.setAutoAck(true);
@@ -309,6 +323,10 @@ public class CmdFunctions extends CmdBase {
}
private void doJavaSubmitChecks(FunctionConfig functionConfig) {
+ if (isNull(className)) {
+ throw new IllegalArgumentException("You supplied a jar file
but no main class");
+ }
+
File file = new File(jarFile);
// check if the function class exists in Jar and it implements
Function class
if (!Reflections.classExistsInJar(file,
functionConfig.getClassName())) {
@@ -431,22 +449,32 @@ public class CmdFunctions extends CmdBase {
}
private void doPythonSubmitChecks(FunctionConfig functionConfig) {
+ if (functionConfig.getClassName() == null) {
+ throw new IllegalArgumentException("You specified a Python
file but no main class name");
+ }
+
if (functionConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new RuntimeException("Effectively-once processing
guarantees not yet supported in Python");
}
}
+ private void validateTopicName(String topic) {
+ if (!TopicName.isValid(topic)) {
+ throw new IllegalArgumentException(String.format("The topic
name %s is invalid", topic));
+ }
+ }
+
private void inferMissingArguments(FunctionConfig functionConfig) {
- if (functionConfig.getName() == null ||
functionConfig.getName().isEmpty()) {
+ if (StringUtils.isEmpty(functionConfig.getName())) {
inferMissingFunctionName(functionConfig);
}
- if (functionConfig.getTenant() == null ||
functionConfig.getTenant().isEmpty()) {
+ if (StringUtils.isEmpty(functionConfig.getTenant())) {
inferMissingTenant(functionConfig);
}
- if (functionConfig.getNamespace() == null ||
functionConfig.getNamespace().isEmpty()) {
+ if (StringUtils.isEmpty(functionConfig.getNamespace())) {
inferMissingNamespace(functionConfig);
}
- if (functionConfig.getOutput() == null ||
functionConfig.getOutput().isEmpty()) {
+ if (StringUtils.isEmpty(functionConfig.getOutput())) {
inferMissingOutput(functionConfig);
}
}
@@ -481,7 +509,8 @@ public class CmdFunctions extends CmdBase {
private void inferMissingOutput(FunctionConfig functionConfig) {
try {
String inputTopic = getUniqueInput(functionConfig);
- functionConfig.setOutput(inputTopic + "-" +
functionConfig.getName() + "-output");
+ String outputTopic = String.format("%s-%s-output", inputTopic,
functionConfig.getName());
+ functionConfig.setOutput(outputTopic);
} catch (IllegalArgumentException ex) {
// It might be that we really don't need an output topic
// So we cannot really throw an exception
@@ -512,16 +541,14 @@ public class CmdFunctions extends CmdBase {
@Override
void runCmd() throws Exception {
- if (!areAllRequiredFieldsPresent(functionConfig)) {
- throw new RuntimeException("Missing arguments");
- }
+ checkRequiredFields(functionConfig);
String serviceUrl = admin.getServiceUrl();
if (brokerServiceUrl != null) {
serviceUrl = brokerServiceUrl;
}
if (serviceUrl == null) {
- serviceUrl = "pulsar://localhost:6650";
+ serviceUrl = DEFAULT_SERVICE_URL;
}
try (ProcessRuntimeFactory containerFactory = new
ProcessRuntimeFactory(
serviceUrl, null, null, null)) {
@@ -564,9 +591,7 @@ public class CmdFunctions extends CmdBase {
class CreateFunction extends FunctionDetailsCommand {
@Override
void runCmd() throws Exception {
- if (!areAllRequiredFieldsPresent(functionConfig)) {
- throw new RuntimeException("Missing arguments");
- }
+ checkRequiredFields(functionConfig);
admin.functions().createFunction(convert(functionConfig),
userCodeFile);
print("Created successfully");
}
@@ -605,9 +630,7 @@ public class CmdFunctions extends CmdBase {
class UpdateFunction extends FunctionDetailsCommand {
@Override
void runCmd() throws Exception {
- if (!areAllRequiredFieldsPresent(functionConfig)) {
- throw new RuntimeException("Missing arguments");
- }
+ checkRequiredFields(functionConfig);
admin.functions().updateFunction(convert(functionConfig),
userCodeFile);
print("Updated successfully");
}
@@ -819,11 +842,34 @@ public class CmdFunctions extends CmdBase {
return mapper.readValue(file, FunctionConfig.class);
}
- public static boolean areAllRequiredFieldsPresent(FunctionConfig
functionConfig) {
- return functionConfig.getTenant() != null &&
functionConfig.getNamespace() != null
- && functionConfig.getName() != null &&
functionConfig.getClassName() != null
- && (functionConfig.getInputs().size() > 0 ||
functionConfig.getCustomSerdeInputs().size() > 0)
- && functionConfig.getParallelism() > 0;
+ private static void verifyNoTopicClash(Collection<String> inputTopics,
String outputTopic) throws IllegalArgumentException {
+ if (inputTopics.contains(outputTopic)) {
+ throw new IllegalArgumentException(
+ String.format("Output topic %s is also being used as an
input topic (topics must be one or the other)",
+ outputTopic));
+ }
+ }
+
+ private static void checkRequiredFields(FunctionConfig config) throws
IllegalArgumentException {
+ if (isNull(config.getTenant())) {
+ throw new IllegalArgumentException("You must specify a tenant for
the function");
+ }
+
+ if (isNull(config.getNamespace())) {
+ throw new IllegalArgumentException("You must specify a namespace
for the function");
+ }
+
+ if (isNull(config.getName())) {
+ throw new IllegalArgumentException("You must specify a name for
the function");
+ }
+
+ if (isNull(config.getClassName())) {
+ throw new IllegalArgumentException("You must specify a class name
for the function");
+ }
+
+ if (config.getInputs().isEmpty() &&
config.getCustomSerdeInputs().isEmpty()) {
+ throw new IllegalArgumentException("You must specify one or more
input topics for the function");
+ }
}
private org.apache.pulsar.functions.proto.Function.FunctionDetails
convertProto2(FunctionConfig functionConfig)
--
To stop receiving notification emails like this one, please contact
[email protected].