This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4cfa83a refactoring functions to use source interface (#1649)
4cfa83a is described below
commit 4cfa83aeeee27e13b258065c19c0fc1488255145
Author: Boyang Jerry Peng <[email protected]>
AuthorDate: Thu Apr 26 13:37:30 2018 -0700
refactoring functions to use source interface (#1649)
* refactoring functions to use source interface
* addressing comments
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 6 +
.../org/apache/pulsar/connect/core/Source.java | 8 +-
pulsar-functions/instance/pom.xml | 6 +
.../{InputMessage.java => InstanceUtils.java} | 46 +++----
.../pulsar/functions/instance/JavaInstance.java | 11 +-
.../functions/instance/JavaInstanceRunnable.java | 139 ++++++++++-----------
.../{InputMessage.java => PulsarConfig.java} | 40 ++----
.../{InputMessage.java => PulsarRecord.java} | 45 ++++---
.../pulsar/functions/instance/PulsarSource.java | 120 ++++++++++++++++++
.../instance/processors/AtLeastOnceProcessor.java | 11 +-
.../instance/processors/AtMostOnceProcessor.java | 11 +-
.../processors/EffectivelyOnceProcessor.java | 43 +++----
.../instance/processors/MessageProcessor.java | 23 ++--
.../instance/processors/MessageProcessorBase.java | 97 +++++++-------
.../functions/instance/JavaInstanceTest.java | 3 +-
.../proto/src/main/proto/Function.proto | 6 +
.../pulsar/functions/runtime/JavaInstanceMain.java | 18 +++
.../pulsar/functions/runtime/ProcessRuntime.java | 7 ++
.../pulsar/functions/runtime/ThreadRuntime.java | 11 +-
.../functions/runtime/ProcessRuntimeTest.java | 15 ++-
.../functions/utils/FunctionDetailsUtils.java | 11 --
.../apache/pulsar/functions/utils/Reflections.java | 34 +++++
22 files changed, 446 insertions(+), 265 deletions(-)
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 0e4d8cc..c1446ed 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -45,6 +45,7 @@ import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.instance.PulsarSource;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
@@ -54,6 +55,7 @@ import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
+import org.apache.pulsar.functions.shaded.proto.Function.ConnectorDetails;
import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
@@ -837,6 +839,10 @@ public class CmdFunctions extends CmdBase {
if (functionConfig.getInputs() != null) {
functionDetailsBuilder.setTenant(functionConfig.getTenant());
}
+ functionDetailsBuilder.setSource(
+ ConnectorDetails.newBuilder()
+ .setClassName(PulsarSource.class.getName())
+ .build());
if (functionConfig.getNamespace() != null) {
functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
}
diff --git
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
index 40f1820..2a41336 100644
---
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
+++
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
@@ -27,12 +27,12 @@ public interface Source<T> extends AutoCloseable {
* @param config initialization config
* @throws Exception IO type exceptions when opening a connector
*/
- void open(final Map<String, String> config) throws Exception;
+ void open(final Map<String, Object> config) throws Exception;
/**
- * Reads the next message from source, if one exists, and returns. This
call should be non-blocking.
- * If source does not have any new messages, return null immediately.
- * @return next message from source or null, if no new messages are
available.
+ * Reads the next message from source.
+ * If source does not have any new messages, this call should block.
+ * @return next message from source. The return result should never be
null
* @throws Exception
*/
Record<T> read() throws Exception;
diff --git a/pulsar-functions/instance/pom.xml
b/pulsar-functions/instance/pom.xml
index ba5183a..3e4cfa1 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -54,6 +54,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-connect-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-java-client</artifactId>
</dependency>
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
similarity index 50%
copy from
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
copy to
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index bd7e788..7233659 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -18,39 +18,29 @@
*/
package org.apache.pulsar.functions.instance;
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.utils.Reflections;
-@Getter
-@Setter
-@ToString
-public class InputMessage {
-
- private Message actualMessage;
- String topicName;
- SerDe inputSerDe;
- Consumer consumer;
-
- public int getTopicPartition() {
- MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
- return msgId.getPartitionIndex();
- }
-
- public void ack() {
- if (null != consumer) {
- consumer.acknowledgeAsync(actualMessage);
+public class InstanceUtils {
+ public static SerDe initializeSerDe(String serdeClassName, ClassLoader
clsLoader,
+ Class<?> type) {
+ if (null == serdeClassName || serdeClassName.isEmpty()) {
+ return null;
+ } else if (serdeClassName.equals(DefaultSerDe.class.getName())) {
+ return initializeDefaultSerDe(type);
+ } else {
+ return Reflections.createInstance(
+ serdeClassName,
+ SerDe.class,
+ clsLoader);
}
}
- public void ackCumulative() {
- if (null != consumer) {
- consumer.acknowledgeCumulativeAsync(actualMessage);
+ public static SerDe initializeDefaultSerDe(Class<?> type) {
+ if (!DefaultSerDe.IsSupportedType(type)) {
+ throw new RuntimeException("Default Serializer does not support "
+ type);
}
+ return new DefaultSerDe(type);
}
-
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 2133c35..67f69bf 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -21,9 +21,9 @@ package org.apache.pulsar.functions.instance;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.connect.core.Source;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -48,11 +48,16 @@ public class JavaInstance implements AutoCloseable {
public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader,
PulsarClient pulsarClient,
- Consumer inputConsumer) {
+ Source source) {
// TODO: cache logger instances by functions?
Logger instanceLog = LoggerFactory.getLogger("function-" +
config.getFunctionDetails().getName());
- this.context = new ContextImpl(config, instanceLog, pulsarClient,
clsLoader, inputConsumer);
+ if (source instanceof PulsarSource) {
+ this.context = new ContextImpl(config, instanceLog, pulsarClient,
clsLoader,
+ ((PulsarSource) source).getInputConsumer());
+ } else {
+ this.context = null;
+ }
// create the functions
if (userClassObject instanceof Function) {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4349860..ce5220f 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -30,6 +30,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.AccessLevel;
import lombok.Getter;
@@ -49,8 +50,10 @@ import org.apache.logging.log4j.core.LoggerContext;
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.connect.core.Record;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.processors.MessageProcessor;
@@ -88,7 +91,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
@Getter
private Exception failureException;
private JavaInstance javaInstance;
- private volatile boolean running = true;
+ private AtomicBoolean running = new AtomicBoolean(true);
@Getter(AccessLevel.PACKAGE)
private Map<String, SerDe> inputSerDe;
@@ -101,6 +104,8 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
// function stats
private final FunctionStats stats;
+ private Record currentRecord;
+
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
@@ -159,7 +164,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
// start any log topic handler
setupLogHandler();
- return new JavaInstance(instanceConfig, object, clsLoader, client,
processor.getInputConsumer());
+ return new JavaInstance(instanceConfig, object, clsLoader, client,
processor.getSource());
}
/**
@@ -169,9 +174,11 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
public void run() {
try {
javaInstance = setupJavaInstance();
- while (running) {
+ while (running.get()) {
+
+ currentRecord = processor.recieveMessage();
- InputMessage currentMessage = processor.recieveMessage();
+ processor.postReceiveMessage(currentRecord);
// state object is per function, because we need to have the
ability to know what updates
// are made in this function and ensure we only acknowledge
after the state is persisted.
@@ -184,20 +191,20 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
}
// process the message
- Object input;
- try {
- input =
currentMessage.getInputSerDe().deserialize(currentMessage.getActualMessage().getData());
- } catch (Exception ex) {
-
stats.incrementDeserializationExceptions(currentMessage.getTopicName());
- throw ex;
- }
long processAt = System.currentTimeMillis();
stats.incrementProcessed(processAt);
addLogTopicHandler();
- JavaExecutionResult result = javaInstance.handleMessage(
- currentMessage.getActualMessage().getMessageId(),
- currentMessage.getTopicName(),
- input);
+ JavaExecutionResult result;
+ MessageId messageId = null;
+ String topicName = null;
+
+ if (currentRecord instanceof PulsarRecord) {
+ PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
+ messageId = pulsarRecord.getMessageId();
+ topicName = pulsarRecord.getTopicName();
+ }
+ result = javaInstance.handleMessage(messageId, topicName,
currentRecord.getValue());
+
removeLogTopicHandler();
long doneProcessing = System.currentTimeMillis();
@@ -209,18 +216,25 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
try {
completableFuture.join();
} catch (Exception e) {
- log.error("Failed to flush the state updates of
message {}", currentMessage, e);
- throw e;
+ log.error("Failed to flush the state updates of
message {}", currentRecord, e);
+ currentRecord.fail();
}
}
- processResult(currentMessage, result, processAt,
doneProcessing);
+ try {
+ processResult(currentRecord, result, processAt,
doneProcessing);
+ } catch (Exception e) {
+ log.warn("Failed to process result of message {}",
currentRecord, e);
+ currentRecord.fail();
+ }
}
-
- javaInstance.close();
} catch (Exception ex) {
- log.info("Uncaught exception in Java Instance", ex);
- failureException = ex;
- throw new RuntimeException(ex);
+ log.error("Uncaught exception in Java Instance", ex);
+ if (running.get()) {
+ failureException = ex;
+ throw new RuntimeException(ex);
+ }
+ } finally {
+ close();
}
}
@@ -286,15 +300,15 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
this.stateTable = result(storageClient.openTable(tableName));
}
- private void processResult(InputMessage msg,
+ private void processResult(Record srcRecord,
JavaExecutionResult result,
long startTime, long endTime) throws Exception {
if (result.getUserException() != null) {
- log.info("Encountered user exception when processing message {}",
msg, result.getUserException());
+ log.info("Encountered user exception when processing message {}",
srcRecord, result.getUserException());
stats.incrementUserExceptions(result.getUserException());
- throw result.getUserException();
+ this.currentRecord.fail();
} else if (result.getSystemException() != null) {
- log.info("Encountered system exception when processing message
{}", msg, result.getSystemException());
+ log.info("Encountered system exception when processing message
{}", srcRecord, result.getSystemException());
stats.incrementSystemExceptions(result.getSystemException());
throw result.getSystemException();
} else {
@@ -308,35 +322,47 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
throw ex;
}
if (output != null) {
- sendOutputMessage(msg, output);
+ sendOutputMessage(srcRecord, output);
} else {
- processor.sendOutputMessage(msg, null);
+ processor.sendOutputMessage(srcRecord, null);
}
} else {
// the function doesn't produce any result or the user doesn't
want the result.
- processor.sendOutputMessage(msg, null);
+ processor.sendOutputMessage(srcRecord, null);
}
}
}
- private void sendOutputMessage(InputMessage srcMsg,
+ private void sendOutputMessage(Record srcRecord,
byte[] output) throws Exception {
- MessageBuilder msgBuilder = MessageBuilder.create()
- .setContent(output)
- .setProperty("__pfn_input_topic__", srcMsg.getTopicName())
- .setProperty("__pfn_input_msg_id__", new
String(Base64.getEncoder().encode(srcMsg.getActualMessage().getMessageId().toByteArray())));
- processor.sendOutputMessage(srcMsg, msgBuilder);
+ MessageBuilder msgBuilder = MessageBuilder.create();
+ if (srcRecord instanceof PulsarRecord) {
+ PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
+ msgBuilder
+ .setContent(output)
+ .setProperty("__pfn_input_topic__",
pulsarMessage.getTopicName())
+ .setProperty("__pfn_input_msg_id__", new
String(Base64.getEncoder().encode(pulsarMessage.getMessageId().toByteArray())));
+ }
+
+ processor.sendOutputMessage(srcRecord, msgBuilder);
+ }
+
+ /**
+ * Stop java instance runnable
+ */
+ public void stop() {
+ this.running.set(false);
}
@Override
public void close() {
- if (!running) {
+ if (!running.get()) {
return;
}
- running = false;
processor.close();
+ javaInstance.close();
// kill the state table
if (null != stateTable) {
@@ -401,39 +427,12 @@ public class JavaInstanceRunnable implements
AutoCloseable, Runnable {
bldr.putMetrics(metricName, digest);
}
- private static SerDe initializeSerDe(String serdeClassName, ClassLoader
clsLoader,
- Class<?>[] typeArgs, boolean
inputArgs) {
- if (null == serdeClassName || serdeClassName.isEmpty()) {
- return null;
- } else if (serdeClassName.equals(DefaultSerDe.class.getName())) {
- return initializeDefaultSerDe(typeArgs, inputArgs);
- } else {
- return Reflections.createInstance(
- serdeClassName,
- SerDe.class,
- clsLoader);
- }
- }
-
- private static SerDe initializeDefaultSerDe(Class<?>[] typeArgs, boolean
inputArgs) {
- if (inputArgs) {
- if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
- throw new RuntimeException("Default Serializer does not
support " + typeArgs[0]);
- }
- return new DefaultSerDe(typeArgs[0]);
- } else {
- if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
- throw new RuntimeException("Default Serializer does not
support " + typeArgs[1]);
- }
- return new DefaultSerDe(typeArgs[1]);
- }
- }
-
private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) {
+
this.inputSerDe = new HashMap<>();
-
instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) ->
this.inputSerDe.put(k, initializeSerDe(v, clsLoader, typeArgs, true)));
+
instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) ->
this.inputSerDe.put(k, InstanceUtils.initializeSerDe(v, clsLoader,
typeArgs[0])));
for (String topicName :
instanceConfig.getFunctionDetails().getInputsList()) {
- this.inputSerDe.put(topicName, initializeDefaultSerDe(typeArgs,
true));
+ this.inputSerDe.put(topicName,
InstanceUtils.initializeDefaultSerDe(typeArgs[0]));
}
if (Void.class.equals(typeArgs[0])) {
@@ -458,9 +457,9 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
if (instanceConfig.getFunctionDetails().getOutputSerdeClassName()
== null
||
instanceConfig.getFunctionDetails().getOutputSerdeClassName().isEmpty()
||
instanceConfig.getFunctionDetails().getOutputSerdeClassName().equals(DefaultSerDe.class.getName()))
{
- outputSerDe = initializeDefaultSerDe(typeArgs, false);
+ outputSerDe =
InstanceUtils.initializeDefaultSerDe(typeArgs[1]);
} else {
- this.outputSerDe =
initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(),
clsLoader, typeArgs, false);
+ this.outputSerDe =
InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(),
clsLoader, typeArgs[1]);
}
Class<?>[] outputSerdeTypeArgs =
TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
if
(outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
similarity index 58%
copy from
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
copy to
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
index bd7e788..c812a77 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
@@ -18,39 +18,25 @@
*/
package org.apache.pulsar.functions.instance;
+import lombok.Builder;
+import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.proto.Function;
+
+import java.util.Map;
@Getter
@Setter
+@Data
+@Builder
@ToString
-public class InputMessage {
-
- private Message actualMessage;
- String topicName;
- SerDe inputSerDe;
- Consumer consumer;
-
- public int getTopicPartition() {
- MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
- return msgId.getPartitionIndex();
- }
-
- public void ack() {
- if (null != consumer) {
- consumer.acknowledgeAsync(actualMessage);
- }
- }
-
- public void ackCumulative() {
- if (null != consumer) {
- consumer.acknowledgeCumulativeAsync(actualMessage);
- }
- }
-
+public class PulsarConfig {
+ private Function.FunctionDetails.ProcessingGuarantees processingGuarantees;
+ private SubscriptionType subscriptionType;
+ private String subscription;
+ private Map<String, SerDe> topicToSerdeMap;
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
similarity index 56%
rename from
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
rename to
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
index bd7e788..2bdbdb1 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
@@ -18,39 +18,36 @@
*/
package org.apache.pulsar.functions.instance;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
-import lombok.Setter;
import lombok.ToString;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.impl.MessageIdImpl;
-import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.connect.core.Record;
+@Data
+@Builder
@Getter
-@Setter
@ToString
-public class InputMessage {
+@EqualsAndHashCode
+public class PulsarRecord<T> implements Record<T> {
- private Message actualMessage;
- String topicName;
- SerDe inputSerDe;
- Consumer consumer;
-
- public int getTopicPartition() {
- MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
- return msgId.getPartitionIndex();
- }
+ private String partitionId;
+ private Long sequenceId;
+ private T value;
+ private MessageId messageId;
+ private String topicName;
+ private Runnable failFunction;
+ private Runnable ackFunction;
+ @Override
public void ack() {
- if (null != consumer) {
- consumer.acknowledgeAsync(actualMessage);
- }
+ this.ackFunction.run();
}
- public void ackCumulative() {
- if (null != consumer) {
- consumer.acknowledgeCumulativeAsync(actualMessage);
- }
+ @Override
+ public void fail() {
+ this.failFunction.run();
}
-
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
new file mode 100644
index 0000000..43d9350
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
+import org.apache.pulsar.functions.proto.Function;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class PulsarSource<T> implements Source<T> {
+
+ private PulsarClient pulsarClient;
+ private PulsarConfig pulsarConfig;
+
+ @Getter
+ private org.apache.pulsar.client.api.Consumer inputConsumer;
+
+ public PulsarSource(PulsarClient pulsarClient, PulsarConfig pulsarConfig) {
+ this.pulsarClient = pulsarClient;
+ this.pulsarConfig = pulsarConfig;
+ }
+
+ @Override
+ public void open(Map<String, Object> config) throws Exception {
+ this.inputConsumer = this.pulsarClient.newConsumer()
+ .topics(new
ArrayList<>(this.pulsarConfig.getTopicToSerdeMap().keySet()))
+ .subscriptionName(this.pulsarConfig.getSubscription())
+ .subscriptionType(this.pulsarConfig.getSubscriptionType())
+ .ackTimeout(1, TimeUnit.MINUTES)
+ .subscribe();
+ }
+
+ @Override
+ public Record<T> read() throws Exception {
+ org.apache.pulsar.client.api.Message<T> message =
this.inputConsumer.receive();
+
+ String topicName;
+ String partitionId;
+
+ // If more than one topics are being read than the Message return by
the consumer will be TopicMessageImpl
+ // If there is only topic being read then the Message returned by the
consumer wil be MessageImpl
+ if (message instanceof TopicMessageImpl) {
+ topicName = ((TopicMessageImpl) message).getTopicName();
+ TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)
message.getMessageId();
+ MessageIdImpl messageId = (MessageIdImpl)
topicMessageId.getInnerMessageId();
+ partitionId = Long.toString(messageId.getPartitionIndex());
+ } else {
+ topicName =
this.pulsarConfig.getTopicToSerdeMap().keySet().iterator().next();
+ partitionId = Long.toString(((MessageIdImpl)
message.getMessageId()).getPartitionIndex());
+ }
+
+ Object object;
+ try {
+ object =
this.pulsarConfig.getTopicToSerdeMap().get(topicName).deserialize(message.getData());
+ } catch (Exception e) {
+ //TODO Add deserialization exception stats
+ throw new RuntimeException("Error occured when attempting to
deserialize input:", e);
+ }
+
+ T input;
+ try {
+ input = (T) object;
+ } catch (ClassCastException e) {
+ throw new RuntimeException("Error in casting input to expected
type:", e);
+ }
+
+ PulsarRecord<T> pulsarMessage = (PulsarRecord<T>)
PulsarRecord.builder()
+ .value(input)
+ .messageId(message.getMessageId())
+ .partitionId(partitionId)
+ .sequenceId(message.getSequenceId())
+ .topicName(topicName)
+ .ackFunction(() -> {
+ if (pulsarConfig.getProcessingGuarantees()
+ ==
Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ inputConsumer.acknowledgeCumulativeAsync(message);
+ } else {
+ inputConsumer.acknowledgeAsync(message);
+ }
+ }).failFunction(() -> {
+ if (pulsarConfig.getProcessingGuarantees()
+ ==
Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new RuntimeException("Failed to process message:
" + message.getMessageId());
+ }
+ })
+ .build();
+ return pulsarMessage;
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.inputConsumer.close();
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
index 86aeb83..465d198 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.instance.processors;
-import java.util.concurrent.LinkedBlockingDeque;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
@@ -27,7 +26,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.connect.core.Record;
import
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -52,15 +51,17 @@ public class AtLeastOnceProcessor extends
MessageProcessorBase {
}
@Override
- public void sendOutputMessage(InputMessage inputMsg,
MessageBuilder<byte[]> outputMsgBuilder) {
+ public void sendOutputMessage(Record srcRecord, MessageBuilder
outputMsgBuilder) {
if (null == outputMsgBuilder || null == producer) {
- inputMsg.ack();
+ srcRecord.ack();
return;
}
Message<byte[]> outputMsg = outputMsgBuilder.build();
producer.sendAsync(outputMsg)
- .thenAccept(msgId -> inputMsg.ack());
+ .thenAccept(msgId -> {
+ srcRecord.ack();
+ });
}
@Override
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
index 994be0d..08cf111 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.instance.processors;
-import java.util.concurrent.LinkedBlockingDeque;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
@@ -26,7 +25,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.connect.core.Record;
import
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -45,10 +44,10 @@ class AtMostOnceProcessor extends MessageProcessorBase {
}
@Override
- protected void postReceiveMessage(InputMessage message) {
- super.postReceiveMessage(message);
+ public void postReceiveMessage(Record record) {
+ super.postReceiveMessage(record);
if (functionDetails.getAutoAck()) {
- message.ack();
+ record.ack();
}
}
@@ -58,7 +57,7 @@ class AtMostOnceProcessor extends MessageProcessorBase {
}
@Override
- public void sendOutputMessage(InputMessage inputMsg,
MessageBuilder<byte[]> outputMsgBuilder) {
+ public void sendOutputMessage(Record srcRecord, MessageBuilder
outputMsgBuilder) {
if (null == outputMsgBuilder) {
return;
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
index 59c7bd6..aafca69 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -18,31 +18,22 @@
*/
package org.apache.pulsar.functions.instance.processors;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
-import
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.functions.instance.PulsarRecord;
import
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
import org.apache.pulsar.functions.instance.producers.Producers;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Utils;
/**
* A message processor that process messages effectively-once.
@@ -50,8 +41,6 @@ import org.apache.pulsar.functions.utils.Utils;
@Slf4j
class EffectivelyOnceProcessor extends MessageProcessorBase implements
ConsumerEventListener {
- private LinkedList<String> inputTopicsToResubscribe = null;
-
@Getter(AccessLevel.PACKAGE)
protected Producers outputProducer;
@@ -103,24 +92,28 @@ class EffectivelyOnceProcessor extends
MessageProcessorBase implements ConsumerE
//
@Override
- public void sendOutputMessage(InputMessage inputMsg,
- MessageBuilder<byte[]> outputMsgBuilder)
throws Exception {
+ public void sendOutputMessage(Record srcRecord,
+ MessageBuilder outputMsgBuilder) throws
Exception {
if (null == outputMsgBuilder) {
- inputMsg.ackCumulative();
+ srcRecord.ack();
return;
}
// assign sequence id to output message for idempotent producing
outputMsgBuilder = outputMsgBuilder
-
.setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId()));
-
-
- Producer<byte[]> producer =
outputProducer.getProducer(inputMsg.getTopicName(),
inputMsg.getTopicPartition());
-
- Message<byte[]> outputMsg = outputMsgBuilder.build();
- producer.sendAsync(outputMsg)
- .thenAccept(messageId -> inputMsg.ackCumulative())
- .join();
+ .setSequenceId(srcRecord.getRecordSequence());
+
+ // currently on PulsarRecord
+ if (srcRecord instanceof PulsarRecord) {
+ PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
+ Producer producer =
outputProducer.getProducer(pulsarMessage.getTopicName(),
+ Integer.parseInt(srcRecord.getPartitionId()));
+
+ org.apache.pulsar.client.api.Message outputMsg =
outputMsgBuilder.build();
+ producer.sendAsync(outputMsg)
+ .thenAccept(messageId -> srcRecord.ack())
+ .join();
+ }
}
@Override
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 97c971d..fd22adb 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -19,16 +19,15 @@
package org.apache.pulsar.functions.instance.processors;
import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
+
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.InputMessage;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import
org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees;
@@ -68,6 +67,8 @@ public interface MessageProcessor extends AutoCloseable {
}
}
+ void postReceiveMessage(Record record);
+
/**
* Setup the input with a provided <i>processQueue</i>. The implementation
of this processor is responsible for
* setting up the input and passing the received messages from input to
the provided <i>processQueue</i>.
@@ -78,11 +79,11 @@ public interface MessageProcessor extends AutoCloseable {
throws Exception;
/**
- * Return the input.
+ * Return the source.
*
- * @return the input consumer.
+ * @return the source.
*/
- Consumer<byte[]> getInputConsumer();
+ Source getSource();
/**
* Setup the output with a provided <i>outputSerDe</i>. The implementation
of this processor is responsible for
@@ -99,18 +100,18 @@ public interface MessageProcessor extends AutoCloseable {
* <p>If the <i>outputMsgBuilder</i> is null, the implementation doesn't
have to send any messages to the output.
* The implementation can decide to acknowledge the input message based on
its process guarantees.
*
- * @param inputMsg input message
+ * @param srcRecord record from source
* @param outputMsgBuilder output message builder. it can be null.
*/
- void sendOutputMessage(InputMessage inputMsg,
- MessageBuilder<byte[]> outputMsgBuilder) throws
PulsarClientException, Exception;
+ void sendOutputMessage(Record srcRecord,
+ MessageBuilder outputMsgBuilder) throws
PulsarClientException, Exception;
/**
* Get the next message to process
* @return the next input message
* @throws Exception
*/
- InputMessage recieveMessage() throws Exception;
+ Record recieveMessage() throws Exception;
@Override
void close();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index fc37b13..06bc175 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -25,18 +25,17 @@ import java.util.Map;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
+import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.functions.instance.PulsarConfig;
+import org.apache.pulsar.functions.instance.PulsarSource;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.Reflections;
/**
* The base implementation of {@link MessageProcessor}.
@@ -48,12 +47,8 @@ abstract class MessageProcessorBase implements
MessageProcessor {
protected final FunctionDetails functionDetails;
protected final SubscriptionType subType;
- protected Map<String, SerDe> inputSerDe;
-
- protected SerDe outputSerDe;
-
@Getter
- protected Consumer<byte[]> inputConsumer;
+ protected Source source;
protected List<String> topics;
@@ -64,6 +59,8 @@ abstract class MessageProcessorBase implements
MessageProcessor {
this.functionDetails = functionDetails;
this.subType = subType;
this.topics = new LinkedList<>();
+ this.topics.addAll(this.functionDetails.getInputsList());
+
this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet());
}
//
@@ -72,36 +69,52 @@ abstract class MessageProcessorBase implements
MessageProcessor {
@Override
public void setupInput(Map<String, SerDe> inputSerDe) throws Exception {
- log.info("Setting up input with input serdes: {}", inputSerDe);
- this.inputSerDe = inputSerDe;
-
this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet());
- this.topics.addAll(this.functionDetails.getInputsList());
- this.inputConsumer = this.client.newConsumer()
- .topics(this.topics)
-
.subscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails))
- .subscriptionType(getSubscriptionType())
- .subscribe();
+ org.apache.pulsar.functions.proto.Function.ConnectorDetails
connectorDetails = this.functionDetails.getSource();
+ Object object;
+ if
(connectorDetails.getClassName().equals(PulsarSource.class.getName())) {
+ PulsarConfig pulsarConfig = PulsarConfig.builder()
+ .topicToSerdeMap(inputSerDe)
+
.subscription(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails))
+
.processingGuarantees(this.functionDetails.getProcessingGuarantees())
+ .subscriptionType(this.subType)
+ .build();
+ Object[] params = {this.client, pulsarConfig};
+ Class[] paramTypes = {PulsarClient.class, PulsarConfig.class};
+ object = Reflections.createInstance(
+ connectorDetails.getClassName(),
+ PulsarSource.class.getClassLoader(), params, paramTypes);
+
+ } else {
+ object = Reflections.createInstance(
+ connectorDetails.getClassName(),
+ Thread.currentThread().getContextClassLoader());
+ }
+
+ Class<?>[] typeArgs;
+ if (object instanceof Source) {
+ typeArgs = TypeResolver.resolveRawArguments(Source.class,
object.getClass());
+ assert typeArgs.length > 0;
+ } else {
+ throw new RuntimeException("Source does not implement correct
interface");
+ }
+ this.source = (Source) object;
+
+ try {
+ this.source.open(connectorDetails.getConfigsMap());
+ } catch (Exception e) {
+ log.info("Error occurred executing open for source: {}",
+ this.functionDetails.getSource().getClassName(), e);
+ }
+
}
protected SubscriptionType getSubscriptionType() {
return subType;
}
- public InputMessage recieveMessage() throws PulsarClientException {
- Message message = this.inputConsumer.receive();
- String topicName;
- if (message instanceof TopicMessageImpl) {
- topicName = ((TopicMessageImpl)message).getTopicName();
- } else {
- topicName = this.topics.get(0);
- }
- InputMessage inputMessage = new InputMessage();
- inputMessage.setConsumer(inputConsumer);
- inputMessage.setInputSerDe(inputSerDe.get(topicName));
- inputMessage.setActualMessage(message);
- inputMessage.setTopicName(topicName);
- return inputMessage;
+ public Record recieveMessage() throws Exception {
+ return this.source.read();
}
/**
@@ -110,9 +123,10 @@ abstract class MessageProcessorBase implements
MessageProcessor {
* <p>The processor implementation can make a decision to process the
message based on its processing guarantees.
* for example, an at-most-once processor can ack the message immediately.
*
- * @param message input message.
+ * @param record input message.
*/
- protected void postReceiveMessage(InputMessage message) {}
+ @Override
+ public void postReceiveMessage(Record record) {}
//
// Output
@@ -120,8 +134,6 @@ abstract class MessageProcessorBase implements
MessageProcessor {
@Override
public void setupOutput(SerDe outputSerDe) throws Exception {
- this.outputSerDe = outputSerDe;
-
String outputTopic = functionDetails.getOutput();
if (outputTopic != null
&& !functionDetails.getOutput().isEmpty()
@@ -141,10 +153,9 @@ abstract class MessageProcessorBase implements
MessageProcessor {
public void close() {
try {
- this.inputConsumer.close();
- } catch (PulsarClientException e) {
- log.warn("Failed to close consumer to input topics {}",
- ((MultiTopicsConsumerImpl)
this.inputConsumer).getTopics(), e);
+ this.source.close();
+ } catch (Exception e) {
+ log.warn("Failed to close source {}", this.source, e);
}
}
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index b9e762a..c0aae7c 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.instance;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -50,7 +51,7 @@ public class JavaInstanceTest {
JavaInstance instance = new JavaInstance(
config,
(Function<String, String>) (input, context) -> input + "-lambda",
- null, null, null);
+ null, null, mock(PulsarSource.class));
String testString = "ABC123";
JavaExecutionResult result =
instance.handleMessage(MessageId.earliest, "random", testString);
assertNotNull(result.getResult());
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto
b/pulsar-functions/proto/src/main/proto/Function.proto
index c9b3da2..1aeb21a 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -53,6 +53,12 @@ message FunctionDetails {
bool autoAck = 13;
repeated string inputs = 14;
int32 parallelism = 15;
+ ConnectorDetails source = 16;
+}
+
+message ConnectorDetails {
+ string className = 1;
+ map<string, string> configs = 4;
}
message PackageLocationMetaData {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 88f1ac7..4b14dad 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -30,6 +30,7 @@ import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function.ConnectorDetails;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
@@ -106,6 +107,13 @@ public class JavaInstanceMain {
@Parameter(names = "--subscription_type", description = "What subscription
type to use")
protected FunctionDetails.SubscriptionType subscriptionType;
+ @Parameter(names = "--source_classname", description = "The source
classname")
+ protected String sourceClassname;
+
+ @Parameter(names = "--source_configs", description = "The source
classname")
+ protected String sourceConfigs;
+
+
private Server server;
public JavaInstanceMain() { }
@@ -159,6 +167,16 @@ public class JavaInstanceMain {
Map<String, String> userConfigMap = new
Gson().fromJson(userConfig, type);
functionDetailsBuilder.putAllUserConfig(userConfigMap);
}
+
+ ConnectorDetails.Builder sourceDetailsBuilder =
ConnectorDetails.newBuilder();
+ sourceDetailsBuilder.setClassName(sourceClassname);
+ if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
+ Type type = new TypeToken<Map<String, String>>(){}.getType();
+ Map<String, String> sourceConfigMap = new
Gson().fromJson(sourceConfigs, type);
+ sourceDetailsBuilder.putAllConfigs(sourceConfigMap);
+ }
+ functionDetailsBuilder.setSource(sourceDetailsBuilder);
+
FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index def06cf..8da2b9d 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -175,6 +175,13 @@ class ProcessRuntime implements Runtime {
instancePort = findAvailablePort();
args.add("--port");
args.add(String.valueOf(instancePort));
+ args.add("--source_classname");
+
args.add(instanceConfig.getFunctionDetails().getSource().getClassName());
+ Map<String, String> sourceConfigs =
instanceConfig.getFunctionDetails().getSource().getConfigsMap();
+ if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
+ args.add("--source_config");
+ args.add(new Gson().toJson(sourceConfigs));
+ }
return args;
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 830e18a..136951e 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -79,7 +79,15 @@ class ThreadRuntime implements Runtime {
@Override
public void uncaughtException(Thread t, Throwable e) {
startupException = new Exception(e);
+ log.error("Error occured in java instance:", e);
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ //ignore
+ }
+ // restart
start();
+
}
});
this.fnThread.start();
@@ -95,9 +103,10 @@ class ThreadRuntime implements Runtime {
@Override
public void stop() {
if (fnThread != null) {
+ // Stop instance thread
+ javaInstanceRunnable.stop();
// interrupt the instance thread
fnThread.interrupt();
- javaInstanceRunnable.close();
try {
fnThread.join();
} catch (InterruptedException e) {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 7042c26..67ac82f 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.runtime.ProcessRuntime;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
@@ -80,6 +81,8 @@ public class ProcessRuntimeTest {
functionDetailsBuilder.setOutput(TEST_NAME + "-output");
functionDetailsBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer");
functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
+ functionDetailsBuilder.setSource(Function.ConnectorDetails.newBuilder()
+ .setClassName("org.pulsar.pulsar.TestSource"));
return functionDetailsBuilder.build();
}
@@ -101,8 +104,7 @@ public class ProcessRuntimeTest {
ProcessRuntime container = factory.createContainer(config,
userJarFile);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 43);
- args.remove(args.size() - 1);
+ assertEquals(args.size(), 45);
String expectedArgs = "java -cp " + javaInstanceJarFile + "
-Dlog4j.configurationFile=java_instance_log4j2.yml "
+ "-Dpulsar.log.dir=" + logDirectory + "/functions" + "
-Dpulsar.log.file=" + config.getFunctionDetails().getName()
+ " org.apache.pulsar.functions.runtime.JavaInstanceMain"
@@ -120,7 +122,8 @@ public class ProcessRuntimeTest {
+ " --output_serde_classname " +
config.getFunctionDetails().getOutputSerdeClassName()
+ " --processing_guarantees ATLEAST_ONCE"
+ " --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port";
+ + " --max_buffered_tuples 1024 --port " + args.get(42)
+ + " --source_classname " +
config.getFunctionDetails().getSource().getClassName();
assertEquals(expectedArgs, String.join(" ", args));
}
@@ -130,8 +133,7 @@ public class ProcessRuntimeTest {
ProcessRuntime container = factory.createContainer(config,
userJarFile);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 42);
- args.remove(args.size() - 1);
+ assertEquals(args.size(), 44);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + userJarFile + " --logging_directory "
+ logDirectory + "/functions" + " --logging_file " +
config.getFunctionDetails().getName() + " --instance_id "
@@ -148,7 +150,8 @@ public class ProcessRuntimeTest {
+ " --output_serde_classname " +
config.getFunctionDetails().getOutputSerdeClassName()
+ " --processing_guarantees ATLEAST_ONCE"
+ " --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port";
+ + " --max_buffered_tuples 1024 --port " + args.get(41)
+ + " --source_classname " +
config.getFunctionDetails().getSource().getClassName();
assertEquals(expectedArgs, String.join(" ", args));
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
index 7706030..5ecdd4f 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
@@ -43,17 +43,6 @@ public class FunctionDetailsUtils {
return fullyQualifiedName.split("/")[2];
}
- public static boolean areAllRequiredFieldsPresent(FunctionDetails
FunctionDetails) {
- if (FunctionDetails.getTenant() == null ||
FunctionDetails.getNamespace() == null
- || FunctionDetails.getName() == null ||
FunctionDetails.getClassName() == null
- || (FunctionDetails.getInputsCount() <= 0 &&
FunctionDetails.getCustomSerdeInputsCount() <= 0)
- || FunctionDetails.getParallelism() <= 0) {
- return false;
- } else {
- return true;
- }
- }
-
public static String getDownloadFileName(FunctionDetails FunctionDetails) {
String[] hierarchy = FunctionDetails.getClassName().split("\\.");
String fileName;
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 890195c..64418bb 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -116,6 +116,40 @@ public class Reflections {
}
+ public static Object createInstance(String userClassName,
+ ClassLoader classLoader, Object[]
params, Class[] paramTypes) {
+ if (params.length != paramTypes.length) {
+ throw new RuntimeException(
+ "Unequal number of params and paramTypes. Each param must
have a correspoinding param type");
+ }
+ Class<?> theCls;
+ try {
+ theCls = Class.forName(userClassName, true, classLoader);
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("User class must be in class path",
cnfe);
+ }
+ Object result;
+ try {
+ Constructor<?> meth = constructorCache.get(theCls);
+ if (null == meth) {
+ meth = theCls.getDeclaredConstructor(paramTypes);
+ meth.setAccessible(true);
+ constructorCache.put(theCls, meth);
+ }
+ result = meth.newInstance(params);
+ } catch (InstantiationException ie) {
+ throw new RuntimeException("User class must be concrete", ie);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("User class doesn't have such method",
e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("User class must have a no-arg
constructor", e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("User class constructor throws
exception", e);
+ }
+ return result;
+
+ }
+
public static Object createInstance(String userClassName, java.io.File
jar) {
try {
return createInstance(userClassName, loadJar(jar));
--
To stop receiving notification emails like this one, please contact
[email protected].