This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 14765b2 Avoid creating output topic on tenant namespace if
output-topic not provided (#2261)
14765b2 is described below
commit 14765b2c4b452dcc3a0d1a4fd488142fd5111a93
Author: Rajan Dhabalia <[email protected]>
AuthorDate: Tue Aug 7 01:06:13 2018 -0700
Avoid creating output topic on tenant namespace if output-topic not
provided (#2261)
* Avoid creating output topic on tenant namespace if output-topic not
provided
fix test
add flag to skip output topic
rename skip-output cmd
* fix test
---
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 31 +++++++++++++-
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 24 ++++-------
.../functions/instance/JavaInstanceRunnable.java | 23 +++++-----
.../pulsar/functions/sink/PulsarSinkDisable.java | 49 ++++++++++++++++++++++
.../pulsar/functions/utils/FunctionConfig.java | 2 +-
5 files changed, 100 insertions(+), 29 deletions(-)
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index edfb1c4..e206e75 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -76,6 +76,7 @@ import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNull;
/**
* Unit test of {@link CmdFunctions}.
@@ -434,11 +435,12 @@ public class CmdFunctionsTest {
}
@Test
- public void testCreateWithoutOutputTopic() throws Exception {
+ public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
String inputTopicName = TEST_NAME + "-input-topic";
cmd.run(new String[] {
"create",
"--inputs", inputTopicName,
+ "--skip-output",
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
@@ -446,8 +448,33 @@ public class CmdFunctionsTest {
});
CreateFunction creater = cmd.getCreater();
- assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" +
"-output", creater.getFunctionConfig().getOutput());
+ assertNull(creater.getFunctionConfig().getOutput());
verify(functions, times(1)).createFunction(any(FunctionDetails.class),
anyString());
+
+ }
+
+
+ @Test
+ public void testCreateWithoutOutputTopic() throws Exception {
+
+ ConsoleOutputCapturer consoleOutputCapturer = new
ConsoleOutputCapturer();
+ consoleOutputCapturer.start();
+
+ String inputTopicName = TEST_NAME + "-input-topic";
+ cmd.run(new String[] {
+ "create",
+ "--inputs", inputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ });
+
+ CreateFunction creater = cmd.getCreater();
+ consoleOutputCapturer.stop();
+ String output = consoleOutputCapturer.getStderr();
+ assertNull(creater.getFunctionConfig().getOutput());
+ assertTrue(output.contains("output topic is not present"));
}
@Test
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 23a6cb8..a4152a4 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -233,8 +233,10 @@ public class CmdFunctions extends CmdBase {
protected String DEPRECATED_topicsPattern;
@Parameter(names = "--topics-pattern", description = "The topic
pattern to consume from list of topics under a namespace that match the
pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe
class name for a pattern in --custom-serde-inputs (supported for java fun
only)")
protected String topicsPattern;
- @Parameter(names = "--output", description = "The function's output
topic")
+ @Parameter(names = "--output", description = "The function's output
topic (use skipOutput flag to skip output topic)")
protected String output;
+ @Parameter(names = "--skip-output", description = "Skip publishing
function output to output topic")
+ protected boolean skipOutput;
// for backwards compatibility purposes
@Parameter(names = "--logTopic", description = "The topic to which the
function's logs are produced", hidden = true)
protected String DEPRECATED_logTopic;
@@ -367,6 +369,7 @@ public class CmdFunctions extends CmdBase {
if (null != output) {
functionConfig.setOutput(output);
}
+ functionConfig.setSkipOutput(skipOutput);
if (null != logTopic) {
functionConfig.setLogTopic(logTopic);
}
@@ -461,6 +464,11 @@ public class CmdFunctions extends CmdBase {
}
protected void validateFunctionConfigs(FunctionConfig functionConfig) {
+
+ if (isBlank(functionConfig.getOutput()) &&
!functionConfig.isSkipOutput()) {
+ throw new ParameterException(
+ "output topic is not present (pass skipOutput flag to
skip publish output on topic)");
+ }
if (isNotBlank(functionConfig.getJar()) &&
isNotBlank(functionConfig.getPy())) {
throw new ParameterException("Either a Java jar or a Python
file needs to"
@@ -534,9 +542,6 @@ public class CmdFunctions extends CmdBase {
if (StringUtils.isEmpty(functionConfig.getNamespace())) {
inferMissingNamespace(functionConfig);
}
- if (StringUtils.isEmpty(functionConfig.getOutput())) {
- inferMissingOutput(functionConfig);
- }
if (functionConfig.getParallelism() == 0) {
functionConfig.setParallelism(1);
@@ -581,17 +586,6 @@ public class CmdFunctions extends CmdBase {
functionConfig.setNamespace(DEFAULT_NAMESPACE);
}
- private void inferMissingOutput(FunctionConfig functionConfig) {
- try {
- String inputTopic = getUniqueInput(functionConfig);
- String outputTopic = String.format("%s-%s-output", inputTopic,
functionConfig.getName());
- functionConfig.setOutput(outputTopic);
- } catch (IllegalArgumentException ex) {
- // It might be that we really don't need an output topic
- // So we cannot really throw an exception
- }
- }
-
private String getUniqueInput(FunctionConfig functionConfig) {
if (functionConfig.getInputs().size() +
functionConfig.getCustomSerdeInputs().size() != 1) {
throw new IllegalArgumentException();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3480fd9..aea9d53 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -63,6 +63,7 @@ import
org.apache.pulsar.functions.proto.InstanceCommunication;
import
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
+import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
@@ -560,19 +561,19 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
Object object;
// If sink classname is not set, we default pulsar sink
if (sinkSpec.getClassName().isEmpty()) {
- PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
-
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(
-
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
- pulsarSinkConfig.setTopic(sinkSpec.getTopic());
- pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
- pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
- Object[] params = {this.client, pulsarSinkConfig};
- Class[] paramTypes = {PulsarClient.class, PulsarSinkConfig.class};
+ if (StringUtils.isEmpty(sinkSpec.getTopic())) {
+ object = PulsarSinkDisable.INSTANCE;
+ } else {
+ PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
+
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees
+
.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
+ pulsarSinkConfig.setTopic(sinkSpec.getTopic());
+
pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
+ pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
+ object = new PulsarSink(this.client, pulsarSinkConfig);
+ }
- object = Reflections.createInstance(
- PulsarSink.class.getName(),
- PulsarSink.class.getClassLoader(), params, paramTypes);
} else {
object = Reflections.createInstance(
sinkSpec.getClassName(),
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java
new file mode 100644
index 0000000..8d71313
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import java.util.Map;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarSinkDisable<T> implements Sink<T> {
+
+ public static final PulsarSinkDisable INSTANCE = new PulsarSinkDisable();
+
+ @Override
+ public void close() throws Exception {
+ // No-op
+ }
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
+ // No-op
+ }
+
+ @Override
+ public void write(Record<T> record) throws Exception {
+ // No-op
+ }
+
+}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 8d91b8e..b0d340f 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -29,7 +29,6 @@ import
org.apache.pulsar.functions.utils.validation.ConfigValidation;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
-import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
@@ -80,6 +79,7 @@ public class FunctionConfig {
private String topicsPattern;
@isValidTopicName
private String output;
+ private boolean skipOutput;
@isImplementationOfClass(implementsClass = SerDe.class)
private String outputSerdeClassName;
@isValidTopicName