This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4694e6e Fix InputTopic Submission for Creating Sink (#2427)
4694e6e is described below
commit 4694e6ee0b2eee26fed2af493ccb21fa39bd8aec
Author: Ali Ahmed <[email protected]>
AuthorDate: Wed Aug 22 21:43:31 2018 -0700
Fix InputTopic Submission for Creating Sink (#2427)
* Fix Spelling mistake in error message
* Fix InputTopic Submission for Creating Sink
---
.../org/apache/pulsar/functions/utils/validation/ValidatorImpls.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index 33a2432..cbeb970 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -547,7 +547,7 @@ public class ValidatorImpls {
if (functionConfig.getTimeoutMs() != null
&& functionConfig.getProcessingGuarantees() != null
&& functionConfig.getProcessingGuarantees() !=
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE) {
- throw new IllegalArgumentException("Message timeout can only
be specifed with processing guarantee is "
+ throw new IllegalArgumentException("Message timeout can only
be specified with processing guarantee is "
+
FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE.name());
}
}
@@ -817,6 +817,9 @@ public class ValidatorImpls {
private static Collection<String> collectAllInputTopics(SinkConfig
sinkConfig) {
List<String> retval = new LinkedList<>();
+ if (sinkConfig.getInputs() != null) {
+ retval.addAll(sinkConfig.getInputs());
+ }
if (sinkConfig.getTopicToSerdeClassName() != null) {
retval.addAll(sinkConfig.getTopicToSerdeClassName().keySet());
}