rdhabalia closed pull request #2261: Avoid creating output topic on tenant
namespace if output-topic not provided
URL: https://github.com/apache/incubator-pulsar/pull/2261
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index edfb1c4459..e206e75253 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -76,6 +76,7 @@
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNull;
/**
* Unit test of {@link CmdFunctions}.
@@ -434,11 +435,12 @@ public void testCreateWithoutFunctionName() throws
Exception {
}
@Test
- public void testCreateWithoutOutputTopic() throws Exception {
+ public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
String inputTopicName = TEST_NAME + "-input-topic";
cmd.run(new String[] {
"create",
"--inputs", inputTopicName,
+ "--skip-output",
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
@@ -446,8 +448,33 @@ public void testCreateWithoutOutputTopic() throws
Exception {
});
CreateFunction creater = cmd.getCreater();
- assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" +
"-output", creater.getFunctionConfig().getOutput());
+ assertNull(creater.getFunctionConfig().getOutput());
verify(functions, times(1)).createFunction(any(FunctionDetails.class),
anyString());
+
+ }
+
+
+ @Test
+ public void testCreateWithoutOutputTopic() throws Exception {
+
+ ConsoleOutputCapturer consoleOutputCapturer = new
ConsoleOutputCapturer();
+ consoleOutputCapturer.start();
+
+ String inputTopicName = TEST_NAME + "-input-topic";
+ cmd.run(new String[] {
+ "create",
+ "--inputs", inputTopicName,
+ "--jar", "SomeJar.jar",
+ "--tenant", "sample",
+ "--namespace", "ns1",
+ "--className", DummyFunction.class.getName(),
+ });
+
+ CreateFunction creater = cmd.getCreater();
+ consoleOutputCapturer.stop();
+ String output = consoleOutputCapturer.getStderr();
+ assertNull(creater.getFunctionConfig().getOutput());
+ assertTrue(output.contains("output topic is not present"));
}
@Test
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 23a6cb868e..a4152a4588 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -233,8 +233,10 @@ void processArguments() throws Exception {
protected String DEPRECATED_topicsPattern;
@Parameter(names = "--topics-pattern", description = "The topic
pattern to consume from list of topics under a namespace that match the
pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe
class name for a pattern in --custom-serde-inputs (supported for java fun
only)")
protected String topicsPattern;
- @Parameter(names = "--output", description = "The function's output
topic")
+ @Parameter(names = "--output", description = "The function's output
topic (use skipOutput flag to skip output topic)")
protected String output;
+ @Parameter(names = "--skip-output", description = "Skip publishing
function output to output topic")
+ protected boolean skipOutput;
// for backwards compatibility purposes
@Parameter(names = "--logTopic", description = "The topic to which the
function's logs are produced", hidden = true)
protected String DEPRECATED_logTopic;
@@ -367,6 +369,7 @@ void processArguments() throws Exception {
if (null != output) {
functionConfig.setOutput(output);
}
+ functionConfig.setSkipOutput(skipOutput);
if (null != logTopic) {
functionConfig.setLogTopic(logTopic);
}
@@ -461,6 +464,11 @@ void processArguments() throws Exception {
}
protected void validateFunctionConfigs(FunctionConfig functionConfig) {
+
+ if (isBlank(functionConfig.getOutput()) &&
!functionConfig.isSkipOutput()) {
+ throw new ParameterException(
+ "output topic is not present (pass skipOutput flag to
skip publish output on topic)");
+ }
if (isNotBlank(functionConfig.getJar()) &&
isNotBlank(functionConfig.getPy())) {
throw new ParameterException("Either a Java jar or a Python
file needs to"
@@ -534,9 +542,6 @@ private void inferMissingArguments(FunctionConfig
functionConfig) {
if (StringUtils.isEmpty(functionConfig.getNamespace())) {
inferMissingNamespace(functionConfig);
}
- if (StringUtils.isEmpty(functionConfig.getOutput())) {
- inferMissingOutput(functionConfig);
- }
if (functionConfig.getParallelism() == 0) {
functionConfig.setParallelism(1);
@@ -581,17 +586,6 @@ private void inferMissingNamespace(FunctionConfig
functionConfig) {
functionConfig.setNamespace(DEFAULT_NAMESPACE);
}
- private void inferMissingOutput(FunctionConfig functionConfig) {
- try {
- String inputTopic = getUniqueInput(functionConfig);
- String outputTopic = String.format("%s-%s-output", inputTopic,
functionConfig.getName());
- functionConfig.setOutput(outputTopic);
- } catch (IllegalArgumentException ex) {
- // It might be that we really don't need an output topic
- // So we cannot really throw an exception
- }
- }
-
private String getUniqueInput(FunctionConfig functionConfig) {
if (functionConfig.getInputs().size() +
functionConfig.getCustomSerdeInputs().size() != 1) {
throw new IllegalArgumentException();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3480fd9e68..aea9d53d93 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -63,6 +63,7 @@
import
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
+import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
@@ -560,19 +561,19 @@ public void setupOutput(ContextImpl contextImpl) throws
Exception {
Object object;
// If sink classname is not set, we default pulsar sink
if (sinkSpec.getClassName().isEmpty()) {
- PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
-
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(
-
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
- pulsarSinkConfig.setTopic(sinkSpec.getTopic());
- pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
- pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
- Object[] params = {this.client, pulsarSinkConfig};
- Class[] paramTypes = {PulsarClient.class, PulsarSinkConfig.class};
+ if (StringUtils.isEmpty(sinkSpec.getTopic())) {
+ object = PulsarSinkDisable.INSTANCE;
+ } else {
+ PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
+
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees
+
.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
+ pulsarSinkConfig.setTopic(sinkSpec.getTopic());
+
pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
+ pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
+ object = new PulsarSink(this.client, pulsarSinkConfig);
+ }
- object = Reflections.createInstance(
- PulsarSink.class.getName(),
- PulsarSink.class.getClassLoader(), params, paramTypes);
} else {
object = Reflections.createInstance(
sinkSpec.getClassName(),
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java
new file mode 100644
index 0000000000..8d713134f1
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import java.util.Map;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarSinkDisable<T> implements Sink<T> {
+
+ public static final PulsarSinkDisable INSTANCE = new PulsarSinkDisable();
+
+ @Override
+ public void close() throws Exception {
+ // No-op
+ }
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext)
throws Exception {
+ // No-op
+ }
+
+ @Override
+ public void write(Record<T> record) throws Exception {
+ // No-op
+ }
+
+}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 8d91b8e8a0..b0d340fa92 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -29,7 +29,6 @@
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
-import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
import
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
@@ -80,6 +79,7 @@
private String topicsPattern;
@isValidTopicName
private String output;
+ private boolean skipOutput;
@isImplementationOfClass(implementsClass = SerDe.class)
private String outputSerdeClassName;
@isValidTopicName
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services