This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 14765b2 Avoid creating output topic on tenant namespace if output-topic not provided (#2261) 14765b2 is described below commit 14765b2c4b452dcc3a0d1a4fd488142fd5111a93 Author: Rajan Dhabalia <rdhaba...@apache.org> AuthorDate: Tue Aug 7 01:06:13 2018 -0700 Avoid creating output topic on tenant namespace if output-topic not provided (#2261) * Avoid creating output topic on tenant namespace if output-topic not provided fix test add flag to skip output topic rename skip-output cmd * fix test --- .../apache/pulsar/admin/cli/CmdFunctionsTest.java | 31 +++++++++++++- .../org/apache/pulsar/admin/cli/CmdFunctions.java | 24 ++++------- .../functions/instance/JavaInstanceRunnable.java | 23 +++++----- .../pulsar/functions/sink/PulsarSinkDisable.java | 49 ++++++++++++++++++++++ .../pulsar/functions/utils/FunctionConfig.java | 2 +- 5 files changed, 100 insertions(+), 29 deletions(-) diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java index edfb1c4..e206e75 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java @@ -76,6 +76,7 @@ import static org.mockito.Mockito.when; import static org.powermock.api.mockito.PowerMockito.mockStatic; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertNull; /** * Unit test of {@link CmdFunctions}. @@ -434,11 +435,12 @@ public class CmdFunctionsTest { } @Test - public void testCreateWithoutOutputTopic() throws Exception { + public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception { String inputTopicName = TEST_NAME + "-input-topic"; cmd.run(new String[] { "create", "--inputs", inputTopicName, + "--skip-output", "--jar", "SomeJar.jar", "--tenant", "sample", "--namespace", "ns1", @@ -446,8 +448,33 @@ public class CmdFunctionsTest { }); CreateFunction creater = cmd.getCreater(); - assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput()); + assertNull(creater.getFunctionConfig().getOutput()); verify(functions, times(1)).createFunction(any(FunctionDetails.class), anyString()); + + } + + + @Test + public void testCreateWithoutOutputTopic() throws Exception { + + ConsoleOutputCapturer consoleOutputCapturer = new ConsoleOutputCapturer(); + consoleOutputCapturer.start(); + + String inputTopicName = TEST_NAME + "-input-topic"; + cmd.run(new String[] { + "create", + "--inputs", inputTopicName, + "--jar", "SomeJar.jar", + "--tenant", "sample", + "--namespace", "ns1", + "--className", DummyFunction.class.getName(), + }); + + CreateFunction creater = cmd.getCreater(); + consoleOutputCapturer.stop(); + String output = consoleOutputCapturer.getStderr(); + assertNull(creater.getFunctionConfig().getOutput()); + assertTrue(output.contains("output topic is not present")); } @Test diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java index 23a6cb8..a4152a4 100644 --- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java +++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java @@ -233,8 +233,10 @@ public class CmdFunctions extends CmdBase { protected String DEPRECATED_topicsPattern; @Parameter(names = "--topics-pattern", description = "The topic pattern to consume from list of topics under a namespace that match the pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe class name for a pattern in --custom-serde-inputs (supported for java fun only)") protected String topicsPattern; - @Parameter(names = "--output", description = "The function's output topic") + @Parameter(names = "--output", description = "The function's output topic (use skipOutput flag to skip output topic)") protected String output; + @Parameter(names = "--skip-output", description = "Skip publishing function output to output topic") + protected boolean skipOutput; // for backwards compatibility purposes @Parameter(names = "--logTopic", description = "The topic to which the function's logs are produced", hidden = true) protected String DEPRECATED_logTopic; @@ -367,6 +369,7 @@ public class CmdFunctions extends CmdBase { if (null != output) { functionConfig.setOutput(output); } + functionConfig.setSkipOutput(skipOutput); if (null != logTopic) { functionConfig.setLogTopic(logTopic); } @@ -461,6 +464,11 @@ public class CmdFunctions extends CmdBase { } protected void validateFunctionConfigs(FunctionConfig functionConfig) { + + if (isBlank(functionConfig.getOutput()) && !functionConfig.isSkipOutput()) { + throw new ParameterException( + "output topic is not present (pass skipOutput flag to skip publish output on topic)"); + } if (isNotBlank(functionConfig.getJar()) && isNotBlank(functionConfig.getPy())) { throw new ParameterException("Either a Java jar or a Python file needs to" @@ -534,9 +542,6 @@ public class CmdFunctions extends CmdBase { if (StringUtils.isEmpty(functionConfig.getNamespace())) { inferMissingNamespace(functionConfig); } - if (StringUtils.isEmpty(functionConfig.getOutput())) { - inferMissingOutput(functionConfig); - } if (functionConfig.getParallelism() == 0) { functionConfig.setParallelism(1); @@ -581,17 +586,6 @@ public class CmdFunctions extends CmdBase { functionConfig.setNamespace(DEFAULT_NAMESPACE); } - private void inferMissingOutput(FunctionConfig functionConfig) { - try { - String inputTopic = getUniqueInput(functionConfig); - String outputTopic = String.format("%s-%s-output", inputTopic, functionConfig.getName()); - functionConfig.setOutput(outputTopic); - } catch (IllegalArgumentException ex) { - // It might be that we really don't need an output topic - // So we cannot really throw an exception - } - } - private String getUniqueInput(FunctionConfig functionConfig) { if (functionConfig.getInputs().size() + functionConfig.getCustomSerdeInputs().size() != 1) { throw new IllegalArgumentException(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 3480fd9..aea9d53 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -63,6 +63,7 @@ import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.sink.PulsarSinkConfig; +import org.apache.pulsar.functions.sink.PulsarSinkDisable; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; @@ -560,19 +561,19 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Object object; // If sink classname is not set, we default pulsar sink if (sinkSpec.getClassName().isEmpty()) { - PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig(); - pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf( - this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); - pulsarSinkConfig.setTopic(sinkSpec.getTopic()); - pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName()); - pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName()); - Object[] params = {this.client, pulsarSinkConfig}; - Class[] paramTypes = {PulsarClient.class, PulsarSinkConfig.class}; + if (StringUtils.isEmpty(sinkSpec.getTopic())) { + object = PulsarSinkDisable.INSTANCE; + } else { + PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig(); + pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees + .valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name())); + pulsarSinkConfig.setTopic(sinkSpec.getTopic()); + pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName()); + pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName()); + object = new PulsarSink(this.client, pulsarSinkConfig); + } - object = Reflections.createInstance( - PulsarSink.class.getName(), - PulsarSink.class.getClassLoader(), params, paramTypes); } else { object = Reflections.createInstance( sinkSpec.getClassName(), diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java new file mode 100644 index 0000000..8d71313 --- /dev/null +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.sink; + +import java.util.Map; + +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class PulsarSinkDisable<T> implements Sink<T> { + + public static final PulsarSinkDisable INSTANCE = new PulsarSinkDisable(); + + @Override + public void close() throws Exception { + // No-op + } + + @Override + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { + // No-op + } + + @Override + public void write(Record<T> record) throws Exception { + // No-op + } + +} diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java index 8d91b8e..b0d340f 100644 --- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java +++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java @@ -29,7 +29,6 @@ import org.apache.pulsar.functions.utils.validation.ConfigValidation; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass; -import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom; import org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber; @@ -80,6 +79,7 @@ public class FunctionConfig { private String topicsPattern; @isValidTopicName private String output; + private boolean skipOutput; @isImplementationOfClass(implementsClass = SerDe.class) private String outputSerdeClassName; @isValidTopicName