merlimat closed pull request #1655: Pulsar Functions CLI fixes
URL: https://github.com/apache/incubator-pulsar/pull/1655
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
index bf8ce6c058..682413cab2 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdBase.java
@@ -77,7 +77,6 @@ public boolean run(String[] args) {
System.err.println("Reason: " + e.getMessage());
return false;
} catch (Exception e) {
- System.err.println("Got exception: " + e.getMessage());
e.printStackTrace();
return false;
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index c1446edce2..1369a6938a 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.Objects.isNull;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import java.io.File;
@@ -27,10 +28,12 @@
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.util.Arrays;
+import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.IntStream;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
@@ -40,6 +43,7 @@
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.utils.NetUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -76,6 +80,7 @@
@Slf4j
@Parameters(commandDescription = "Interface for managing Pulsar Functions
(lightweight, Lambda-style compute processes that work with Pulsar)")
public class CmdFunctions extends CmdBase {
+ private static final String DEFAULT_SERVICE_URL =
"pulsar://localhost:6650";
private final LocalRunner localRunner;
private final CreateFunction creater;
@@ -239,14 +244,20 @@ void processArguments() throws Exception {
}
if (null != inputs) {
-
Arrays.asList(inputs.split(",")).forEach(functionConfig.getInputs()::add);
+ List<String> inputTopics = Arrays.asList(inputs.split(","));
+ inputTopics.forEach(this::validateTopicName);
+ functionConfig.setInputs(inputTopics);
}
if (null != customSerdeInputString) {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, String> customSerdeInputMap = new
Gson().fromJson(customSerdeInputString, type);
+ customSerdeInputMap.forEach((topic, serde) -> {
+ validateTopicName(topic);
+ });
functionConfig.setCustomSerdeInputs(customSerdeInputMap);
}
if (null != output) {
+ validateTopicName(output);
functionConfig.setOutput(output);
}
if (null != logTopic) {
@@ -281,10 +292,13 @@ void processArguments() throws Exception {
throw new RuntimeException("Either a Java jar or a Python file
needs to be specified for the function");
}
- if (functionConfig.getInputs().size() == 0 &&
functionConfig.getCustomSerdeInputs().size() == 0) {
+ if (functionConfig.getInputs().isEmpty() &&
functionConfig.getCustomSerdeInputs().isEmpty()) {
throw new RuntimeException("No input topic(s) specified for
the function");
}
+ // Ensure that topics aren't being used as both input and output
+ verifyNoTopicClash(functionConfig.getInputs(),
functionConfig.getOutput());;
+
if (parallelism == null) {
if (functionConfig.getParallelism() == 0) {
functionConfig.setParallelism(1);
@@ -301,7 +315,7 @@ void processArguments() throws Exception {
&& functionConfig.getSubscriptionType() !=
FunctionConfig.SubscriptionType.FAILOVER
&& functionConfig.getProcessingGuarantees() != null
&& functionConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
- throw new IllegalArgumentException("Effectively Once can only
be acheived with Failover subscription");
+ throw new IllegalArgumentException("Effectively-once
processing semantics can only be achieved using a Failover subscription type");
}
functionConfig.setAutoAck(true);
@@ -309,6 +323,10 @@ void processArguments() throws Exception {
}
private void doJavaSubmitChecks(FunctionConfig functionConfig) {
+ if (isNull(className)) {
+ throw new IllegalArgumentException("You supplied a jar file
but no main class");
+ }
+
File file = new File(jarFile);
// check if the function class exists in Jar and it implements
Function class
if (!Reflections.classExistsInJar(file,
functionConfig.getClassName())) {
@@ -431,22 +449,32 @@ private void doJavaSubmitChecks(FunctionConfig
functionConfig) {
}
private void doPythonSubmitChecks(FunctionConfig functionConfig) {
+ if (functionConfig.getClassName() == null) {
+ throw new IllegalArgumentException("You specified a Python
file but no main class name");
+ }
+
if (functionConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
throw new RuntimeException("Effectively-once processing
guarantees not yet supported in Python");
}
}
+ private void validateTopicName(String topic) {
+ if (!TopicName.isValid(topic)) {
+ throw new IllegalArgumentException(String.format("The topic
name %s is invalid", topic));
+ }
+ }
+
private void inferMissingArguments(FunctionConfig functionConfig) {
- if (functionConfig.getName() == null ||
functionConfig.getName().isEmpty()) {
+ if (StringUtils.isEmpty(functionConfig.getName())) {
inferMissingFunctionName(functionConfig);
}
- if (functionConfig.getTenant() == null ||
functionConfig.getTenant().isEmpty()) {
+ if (StringUtils.isEmpty(functionConfig.getTenant())) {
inferMissingTenant(functionConfig);
}
- if (functionConfig.getNamespace() == null ||
functionConfig.getNamespace().isEmpty()) {
+ if (StringUtils.isEmpty(functionConfig.getNamespace())) {
inferMissingNamespace(functionConfig);
}
- if (functionConfig.getOutput() == null ||
functionConfig.getOutput().isEmpty()) {
+ if (StringUtils.isEmpty(functionConfig.getOutput())) {
inferMissingOutput(functionConfig);
}
}
@@ -481,7 +509,8 @@ private void inferMissingNamespace(FunctionConfig
functionConfig) {
private void inferMissingOutput(FunctionConfig functionConfig) {
try {
String inputTopic = getUniqueInput(functionConfig);
- functionConfig.setOutput(inputTopic + "-" +
functionConfig.getName() + "-output");
+ String outputTopic = String.format("%s-%s-output", inputTopic,
functionConfig.getName());
+ functionConfig.setOutput(outputTopic);
} catch (IllegalArgumentException ex) {
// It might be that we really don't need an output topic
// So we cannot really throw an exception
@@ -512,16 +541,14 @@ private String getUniqueInput(FunctionConfig
functionConfig) {
@Override
void runCmd() throws Exception {
- if (!areAllRequiredFieldsPresent(functionConfig)) {
- throw new RuntimeException("Missing arguments");
- }
+ checkRequiredFields(functionConfig);
String serviceUrl = admin.getServiceUrl();
if (brokerServiceUrl != null) {
serviceUrl = brokerServiceUrl;
}
if (serviceUrl == null) {
- serviceUrl = "pulsar://localhost:6650";
+ serviceUrl = DEFAULT_SERVICE_URL;
}
try (ProcessRuntimeFactory containerFactory = new
ProcessRuntimeFactory(
serviceUrl, null, null, null)) {
@@ -564,9 +591,7 @@ public void run() {
class CreateFunction extends FunctionDetailsCommand {
@Override
void runCmd() throws Exception {
- if (!areAllRequiredFieldsPresent(functionConfig)) {
- throw new RuntimeException("Missing arguments");
- }
+ checkRequiredFields(functionConfig);
admin.functions().createFunction(convert(functionConfig),
userCodeFile);
print("Created successfully");
}
@@ -605,9 +630,7 @@ void runCmd() throws Exception {
class UpdateFunction extends FunctionDetailsCommand {
@Override
void runCmd() throws Exception {
- if (!areAllRequiredFieldsPresent(functionConfig)) {
- throw new RuntimeException("Missing arguments");
- }
+ checkRequiredFields(functionConfig);
admin.functions().updateFunction(convert(functionConfig),
userCodeFile);
print("Updated successfully");
}
@@ -819,11 +842,34 @@ private static FunctionConfig loadConfig(File file)
throws IOException {
return mapper.readValue(file, FunctionConfig.class);
}
- public static boolean areAllRequiredFieldsPresent(FunctionConfig
functionConfig) {
- return functionConfig.getTenant() != null &&
functionConfig.getNamespace() != null
- && functionConfig.getName() != null &&
functionConfig.getClassName() != null
- && (functionConfig.getInputs().size() > 0 ||
functionConfig.getCustomSerdeInputs().size() > 0)
- && functionConfig.getParallelism() > 0;
+ private static void verifyNoTopicClash(Collection<String> inputTopics,
String outputTopic) throws IllegalArgumentException {
+ if (inputTopics.contains(outputTopic)) {
+ throw new IllegalArgumentException(
+ String.format("Output topic %s is also being used as an
input topic (topics must be one or the other)",
+ outputTopic));
+ }
+ }
+
+ private static void checkRequiredFields(FunctionConfig config) throws
IllegalArgumentException {
+ if (isNull(config.getTenant())) {
+ throw new IllegalArgumentException("You must specify a tenant for
the function");
+ }
+
+ if (isNull(config.getNamespace())) {
+ throw new IllegalArgumentException("You must specify a namespace
for the function");
+ }
+
+ if (isNull(config.getName())) {
+ throw new IllegalArgumentException("You must specify a name for
the function");
+ }
+
+ if (isNull(config.getClassName())) {
+ throw new IllegalArgumentException("You must specify a class name
for the function");
+ }
+
+ if (config.getInputs().isEmpty() &&
config.getCustomSerdeInputs().isEmpty()) {
+ throw new IllegalArgumentException("You must specify one or more
input topics for the function");
+ }
}
private org.apache.pulsar.functions.proto.Function.FunctionDetails
convertProto2(FunctionConfig functionConfig)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services