This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 14765b2  Avoid creating output topic on tenant namespace if 
output-topic not provided (#2261)
14765b2 is described below

commit 14765b2c4b452dcc3a0d1a4fd488142fd5111a93
Author: Rajan Dhabalia <rdhaba...@apache.org>
AuthorDate: Tue Aug 7 01:06:13 2018 -0700

    Avoid creating output topic on tenant namespace if output-topic not 
provided (#2261)
    
    * Avoid creating output topic on tenant namespace if output-topic not 
provided
    
    fix test
    
    add flag to skip output topic
    
    rename skip-output cmd
    
    * fix test
---
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  | 31 +++++++++++++-
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  | 24 ++++-------
 .../functions/instance/JavaInstanceRunnable.java   | 23 +++++-----
 .../pulsar/functions/sink/PulsarSinkDisable.java   | 49 ++++++++++++++++++++++
 .../pulsar/functions/utils/FunctionConfig.java     |  2 +-
 5 files changed, 100 insertions(+), 29 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index edfb1c4..e206e75 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -76,6 +76,7 @@ import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertNull;
 
 /**
  * Unit test of {@link CmdFunctions}.
@@ -434,11 +435,12 @@ public class CmdFunctionsTest {
     }
 
     @Test
-    public void testCreateWithoutOutputTopic() throws Exception {
+    public void testCreateWithoutOutputTopicWithSkipFlag() throws Exception {
         String inputTopicName = TEST_NAME + "-input-topic";
         cmd.run(new String[] {
                 "create",
                 "--inputs", inputTopicName,
+                "--skip-output",
                 "--jar", "SomeJar.jar",
                 "--tenant", "sample",
                 "--namespace", "ns1",
@@ -446,8 +448,33 @@ public class CmdFunctionsTest {
         });
 
         CreateFunction creater = cmd.getCreater();
-        assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" + 
"-output", creater.getFunctionConfig().getOutput());
+        assertNull(creater.getFunctionConfig().getOutput());
         verify(functions, times(1)).createFunction(any(FunctionDetails.class), 
anyString());
+
+    }
+
+    
+    @Test
+    public void testCreateWithoutOutputTopic() throws Exception {
+
+        ConsoleOutputCapturer consoleOutputCapturer = new 
ConsoleOutputCapturer();
+        consoleOutputCapturer.start();
+
+        String inputTopicName = TEST_NAME + "-input-topic";
+        cmd.run(new String[] {
+                "create",
+                "--inputs", inputTopicName,
+                "--jar", "SomeJar.jar",
+                "--tenant", "sample",
+                "--namespace", "ns1",
+                "--className", DummyFunction.class.getName(),
+        });
+
+        CreateFunction creater = cmd.getCreater();
+        consoleOutputCapturer.stop();
+        String output = consoleOutputCapturer.getStderr();
+        assertNull(creater.getFunctionConfig().getOutput());
+        assertTrue(output.contains("output topic is not present"));
     }
 
     @Test
diff --git 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 23a6cb8..a4152a4 100644
--- 
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ 
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -233,8 +233,10 @@ public class CmdFunctions extends CmdBase {
         protected String DEPRECATED_topicsPattern;
         @Parameter(names = "--topics-pattern", description = "The topic 
pattern to consume from list of topics under a namespace that match the 
pattern. [--input] and [--topic-pattern] are mutually exclusive. Add SerDe 
class name for a pattern in --custom-serde-inputs (supported for java fun 
only)")
         protected String topicsPattern;
-        @Parameter(names = "--output", description = "The function's output 
topic")
+        @Parameter(names = "--output", description = "The function's output 
topic (use skipOutput flag to skip output topic)")
         protected String output;
+        @Parameter(names = "--skip-output", description = "Skip publishing 
function output to output topic")
+        protected boolean skipOutput;
         // for backwards compatibility purposes
         @Parameter(names = "--logTopic", description = "The topic to which the 
function's logs are produced", hidden = true)
         protected String DEPRECATED_logTopic;
@@ -367,6 +369,7 @@ public class CmdFunctions extends CmdBase {
             if (null != output) {
                 functionConfig.setOutput(output);
             }
+            functionConfig.setSkipOutput(skipOutput);
             if (null != logTopic) {
                 functionConfig.setLogTopic(logTopic);
             }
@@ -461,6 +464,11 @@ public class CmdFunctions extends CmdBase {
         }
 
         protected void validateFunctionConfigs(FunctionConfig functionConfig) {
+            
+            if (isBlank(functionConfig.getOutput()) && 
!functionConfig.isSkipOutput()) {
+                throw new ParameterException(
+                        "output topic is not present (pass skipOutput flag to 
skip publish output on topic)");
+            }
 
             if (isNotBlank(functionConfig.getJar()) && 
isNotBlank(functionConfig.getPy())) {
                 throw new ParameterException("Either a Java jar or a Python 
file needs to"
@@ -534,9 +542,6 @@ public class CmdFunctions extends CmdBase {
             if (StringUtils.isEmpty(functionConfig.getNamespace())) {
                 inferMissingNamespace(functionConfig);
             }
-            if (StringUtils.isEmpty(functionConfig.getOutput())) {
-                inferMissingOutput(functionConfig);
-            }
 
             if (functionConfig.getParallelism() == 0) {
                 functionConfig.setParallelism(1);
@@ -581,17 +586,6 @@ public class CmdFunctions extends CmdBase {
             functionConfig.setNamespace(DEFAULT_NAMESPACE);
         }
 
-        private void inferMissingOutput(FunctionConfig functionConfig) {
-            try {
-                String inputTopic = getUniqueInput(functionConfig);
-                String outputTopic = String.format("%s-%s-output", inputTopic, 
functionConfig.getName());
-                functionConfig.setOutput(outputTopic);
-            } catch (IllegalArgumentException ex) {
-                // It might be that we really don't need an output topic
-                // So we cannot really throw an exception
-            }
-        }
-
         private String getUniqueInput(FunctionConfig functionConfig) {
             if (functionConfig.getInputs().size() + 
functionConfig.getCustomSerdeInputs().size() != 1) {
                 throw new IllegalArgumentException();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 3480fd9..aea9d53 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -63,6 +63,7 @@ import 
org.apache.pulsar.functions.proto.InstanceCommunication;
 import 
org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.sink.PulsarSinkConfig;
+import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
@@ -560,19 +561,19 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         Object object;
         // If sink classname is not set, we default pulsar sink
         if (sinkSpec.getClassName().isEmpty()) {
-            PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
-            
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.valueOf(
-                    
this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
-            pulsarSinkConfig.setTopic(sinkSpec.getTopic());
-            pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
-            pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
 
-            Object[] params = {this.client, pulsarSinkConfig};
-            Class[] paramTypes = {PulsarClient.class, PulsarSinkConfig.class};
+            if (StringUtils.isEmpty(sinkSpec.getTopic())) {
+                object = PulsarSinkDisable.INSTANCE;
+            } else {
+                PulsarSinkConfig pulsarSinkConfig = new PulsarSinkConfig();
+                
pulsarSinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees
+                        
.valueOf(this.instanceConfig.getFunctionDetails().getProcessingGuarantees().name()));
+                pulsarSinkConfig.setTopic(sinkSpec.getTopic());
+                
pulsarSinkConfig.setSerDeClassName(sinkSpec.getSerDeClassName());
+                pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
+                object = new PulsarSink(this.client, pulsarSinkConfig);
+            }
 
-            object = Reflections.createInstance(
-                    PulsarSink.class.getName(),
-                    PulsarSink.class.getClassLoader(), params, paramTypes);
         } else {
             object = Reflections.createInstance(
                     sinkSpec.getClassName(),
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java
new file mode 100644
index 0000000..8d71313
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSinkDisable.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import java.util.Map;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class PulsarSinkDisable<T> implements Sink<T> {
+
+    public static final PulsarSinkDisable INSTANCE = new PulsarSinkDisable();
+
+    @Override
+    public void close() throws Exception {
+        // No-op
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
+        // No-op
+    }
+
+    @Override
+    public void write(Record<T> record) throws Exception {
+        // No-op
+    }
+
+}
diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 8d91b8e..b0d340f 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -29,7 +29,6 @@ import 
org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.NotNull;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isFileExists;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClass;
-import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isImplementationOfClasses;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isListEntryCustom;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isMapEntryCustom;
 import 
org.apache.pulsar.functions.utils.validation.ConfigValidationAnnotations.isPositiveNumber;
@@ -80,6 +79,7 @@ public class FunctionConfig {
     private String topicsPattern;
     @isValidTopicName
     private String output;
+    private boolean skipOutput;
     @isImplementationOfClass(implementsClass = SerDe.class)
     private String outputSerdeClassName;
     @isValidTopicName

Reply via email to