This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b2bf19 Misc Cleanups (#2512)
7b2bf19 is described below
commit 7b2bf197ac5b17e4fcfdb9d6aa2a7fadbe713d37
Author: Sanjeev Kulkarni <[email protected]>
AuthorDate: Wed Sep 5 10:49:26 2018 -0700
Misc Cleanups (#2512)
---
.../apache/pulsar/functions/instance/JavaInstanceRunnable.java | 10 +---------
pulsar-functions/proto/src/main/proto/Function.proto | 2 +-
2 files changed, 2 insertions(+), 10 deletions(-)
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 344d9ee..4ba7340 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -210,14 +210,6 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
stats.incrementProcessed(processAt);
addLogTopicHandler();
JavaExecutionResult result;
- MessageId messageId = null;
- String topicName = null;
-
- if (currentRecord instanceof PulsarRecord) {
- PulsarRecord<?> pulsarRecord = (PulsarRecord<?>)
currentRecord;
- messageId = pulsarRecord.getMessageId();
- topicName = pulsarRecord.getTopicName().get();
- }
result = javaInstance.handleMessage(currentRecord,
currentRecord.getValue());
@@ -511,7 +503,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
// If source classname is not set, we default pulsar source
if (sourceSpec.getClassName().isEmpty()) {
PulsarSourceConfig pulsarSourceConfig = new PulsarSourceConfig();
- sourceSpec.getInputSpecs().forEach((topic, conf) -> {
+ sourceSpec.getInputSpecsMap().forEach((topic, conf) -> {
ConsumerConfig consumerConfig =
ConsumerConfig.builder().isRegexPattern(conf.getIsRegexPattern()).build();
if (conf.getSchemaType() != null &&
!conf.getSchemaType().isEmpty()) {
consumerConfig.setSchemaType(conf.getSchemaType());
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto
b/pulsar-functions/proto/src/main/proto/Function.proto
index eff5d1d..6e969ae 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -85,7 +85,7 @@ message SourceSpec {
map<string, ConsumerSpec> inputSpecs = 10;
uint64 timeoutMs = 6;
- string topicsPattern = 7;
+ string topicsPattern = 7 [deprecated = true];
/* If specified, this will refer to an archive that is
* already present in the server */