This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4694e6e  Fix InputTopic Submission for Creating Sink (#2427)
4694e6e is described below

commit 4694e6ee0b2eee26fed2af493ccb21fa39bd8aec
Author: Ali Ahmed <[email protected]>
AuthorDate: Wed Aug 22 21:43:31 2018 -0700

    Fix InputTopic Submission for Creating Sink (#2427)
    
    * Fix Spelling mistake in error message
    
    * Fix InputTopic Submission for Creating Sink
---
 .../org/apache/pulsar/functions/utils/validation/ValidatorImpls.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index 33a2432..cbeb970 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -547,7 +547,7 @@ public class ValidatorImpls {
             if (functionConfig.getTimeoutMs() != null
                     && functionConfig.getProcessingGuarantees() != null
                     && functionConfig.getProcessingGuarantees() != 
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
-                throw new IllegalArgumentException("Message timeout can only 
be specifed with processing guarantee is "
+                throw new IllegalArgumentException("Message timeout can only 
be specified with processing guarantee is "
                         + 
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
             }
         }
@@ -817,6 +817,9 @@ public class ValidatorImpls {
 
         private static Collection<String> collectAllInputTopics(SinkConfig 
sinkConfig) {
             List<String> retval = new LinkedList<>();
+            if (sinkConfig.getInputs() != null) {
+                retval.addAll(sinkConfig.getInputs());
+            }
             if (sinkConfig.getTopicToSerdeClassName() != null) {
                 retval.addAll(sinkConfig.getTopicToSerdeClassName().keySet());
             }

Reply via email to