sijie closed pull request #1649: refactoring functions to use source interface
URL: https://github.com/apache/incubator-pulsar/pull/1649
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 52feab9007..687319663a 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -45,6 +45,7 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.functions.api.Function;
+import org.apache.pulsar.functions.instance.PulsarSource;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
@@ -54,6 +55,7 @@
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBuf;
import org.apache.pulsar.functions.shaded.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.functions.shaded.io.netty.buffer.Unpooled;
+import org.apache.pulsar.functions.shaded.proto.Function.ConnectorDetails;
import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
@@ -837,6 +839,10 @@ private FunctionDetails convert(FunctionConfig
functionConfig)
if (functionConfig.getInputs() != null) {
functionDetailsBuilder.setTenant(functionConfig.getTenant());
}
+ functionDetailsBuilder.setSource(
+ ConnectorDetails.newBuilder()
+ .setClassName(PulsarSource.class.getName())
+ .build());
if (functionConfig.getNamespace() != null) {
functionDetailsBuilder.setNamespace(functionConfig.getNamespace());
}
diff --git
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
index 40f1820709..2a4133632e 100644
---
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
+++
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
@@ -27,12 +27,12 @@
* @param config initialization config
* @throws Exception IO type exceptions when opening a connector
*/
- void open(final Map<String, String> config) throws Exception;
+ void open(final Map<String, Object> config) throws Exception;
/**
- * Reads the next message from source, if one exists, and returns. This
call should be non-blocking.
- * If source does not have any new messages, return null immediately.
- * @return next message from source or null, if no new messages are
available.
+ * Reads the next message from source.
+ * If source does not have any new messages, this call should block.
+ * @return next message from source. The return result should never be
null
* @throws Exception
*/
Record<T> read() throws Exception;
diff --git a/pulsar-functions/instance/pom.xml
b/pulsar-functions/instance/pom.xml
index ba5183a967..3e4cfa14b2 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -53,6 +53,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-connect-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>stream-storage-java-client</artifactId>
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
new file mode 100644
index 0000000000..72336596ee
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.api.utils.DefaultSerDe;
+import org.apache.pulsar.functions.utils.Reflections;
+
+public class InstanceUtils {
+ public static SerDe initializeSerDe(String serdeClassName, ClassLoader
clsLoader,
+ Class<?> type) {
+ if (null == serdeClassName || serdeClassName.isEmpty()) {
+ return null;
+ } else if (serdeClassName.equals(DefaultSerDe.class.getName())) {
+ return initializeDefaultSerDe(type);
+ } else {
+ return Reflections.createInstance(
+ serdeClassName,
+ SerDe.class,
+ clsLoader);
+ }
+ }
+
+ public static SerDe initializeDefaultSerDe(Class<?> type) {
+ if (!DefaultSerDe.IsSupportedType(type)) {
+ throw new RuntimeException("Default Serializer does not support "
+ type);
+ }
+ return new DefaultSerDe(type);
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 2133c3565c..67f69bfc07 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -21,9 +21,9 @@
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.connect.core.Source;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
@@ -48,11 +48,16 @@
public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader,
PulsarClient pulsarClient,
- Consumer inputConsumer) {
+ Source source) {
// TODO: cache logger instances by functions?
Logger instanceLog = LoggerFactory.getLogger("function-" +
config.getFunctionDetails().getName());
- this.context = new ContextImpl(config, instanceLog, pulsarClient,
clsLoader, inputConsumer);
+ if (source instanceof PulsarSource) {
+ this.context = new ContextImpl(config, instanceLog, pulsarClient,
clsLoader,
+ ((PulsarSource) source).getInputConsumer());
+ } else {
+ this.context = null;
+ }
// create the functions
if (userClassObject instanceof Function) {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4349860d83..ce5220f640 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -30,6 +30,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import lombok.AccessLevel;
import lombok.Getter;
@@ -49,8 +50,10 @@
import org.apache.logging.log4j.core.config.Configuration;
import org.apache.logging.log4j.core.config.LoggerConfig;
import org.apache.pulsar.client.api.MessageBuilder;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.connect.core.Record;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.processors.MessageProcessor;
@@ -88,7 +91,7 @@
@Getter
private Exception failureException;
private JavaInstance javaInstance;
- private volatile boolean running = true;
+ private AtomicBoolean running = new AtomicBoolean(true);
@Getter(AccessLevel.PACKAGE)
private Map<String, SerDe> inputSerDe;
@@ -101,6 +104,8 @@
// function stats
private final FunctionStats stats;
+ private Record currentRecord;
+
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
@@ -159,7 +164,7 @@ JavaInstance setupJavaInstance() throws Exception {
// start any log topic handler
setupLogHandler();
- return new JavaInstance(instanceConfig, object, clsLoader, client,
processor.getInputConsumer());
+ return new JavaInstance(instanceConfig, object, clsLoader, client,
processor.getSource());
}
/**
@@ -169,9 +174,11 @@ JavaInstance setupJavaInstance() throws Exception {
public void run() {
try {
javaInstance = setupJavaInstance();
- while (running) {
+ while (running.get()) {
+
+ currentRecord = processor.recieveMessage();
- InputMessage currentMessage = processor.recieveMessage();
+ processor.postReceiveMessage(currentRecord);
// state object is per function, because we need to have the
ability to know what updates
// are made in this function and ensure we only acknowledge
after the state is persisted.
@@ -184,20 +191,20 @@ public void run() {
}
// process the message
- Object input;
- try {
- input =
currentMessage.getInputSerDe().deserialize(currentMessage.getActualMessage().getData());
- } catch (Exception ex) {
-
stats.incrementDeserializationExceptions(currentMessage.getTopicName());
- throw ex;
- }
long processAt = System.currentTimeMillis();
stats.incrementProcessed(processAt);
addLogTopicHandler();
- JavaExecutionResult result = javaInstance.handleMessage(
- currentMessage.getActualMessage().getMessageId(),
- currentMessage.getTopicName(),
- input);
+ JavaExecutionResult result;
+ MessageId messageId = null;
+ String topicName = null;
+
+ if (currentRecord instanceof PulsarRecord) {
+ PulsarRecord pulsarRecord = (PulsarRecord) currentRecord;
+ messageId = pulsarRecord.getMessageId();
+ topicName = pulsarRecord.getTopicName();
+ }
+ result = javaInstance.handleMessage(messageId, topicName,
currentRecord.getValue());
+
removeLogTopicHandler();
long doneProcessing = System.currentTimeMillis();
@@ -209,18 +216,25 @@ public void run() {
try {
completableFuture.join();
} catch (Exception e) {
- log.error("Failed to flush the state updates of
message {}", currentMessage, e);
- throw e;
+ log.error("Failed to flush the state updates of
message {}", currentRecord, e);
+ currentRecord.fail();
}
}
- processResult(currentMessage, result, processAt,
doneProcessing);
+ try {
+ processResult(currentRecord, result, processAt,
doneProcessing);
+ } catch (Exception e) {
+ log.warn("Failed to process result of message {}",
currentRecord, e);
+ currentRecord.fail();
+ }
}
-
- javaInstance.close();
} catch (Exception ex) {
- log.info("Uncaught exception in Java Instance", ex);
- failureException = ex;
- throw new RuntimeException(ex);
+ log.error("Uncaught exception in Java Instance", ex);
+ if (running.get()) {
+ failureException = ex;
+ throw new RuntimeException(ex);
+ }
+ } finally {
+ close();
}
}
@@ -286,15 +300,15 @@ private void setupStateTable() throws Exception {
this.stateTable = result(storageClient.openTable(tableName));
}
- private void processResult(InputMessage msg,
+ private void processResult(Record srcRecord,
JavaExecutionResult result,
long startTime, long endTime) throws Exception {
if (result.getUserException() != null) {
- log.info("Encountered user exception when processing message {}",
msg, result.getUserException());
+ log.info("Encountered user exception when processing message {}",
srcRecord, result.getUserException());
stats.incrementUserExceptions(result.getUserException());
- throw result.getUserException();
+ this.currentRecord.fail();
} else if (result.getSystemException() != null) {
- log.info("Encountered system exception when processing message
{}", msg, result.getSystemException());
+ log.info("Encountered system exception when processing message
{}", srcRecord, result.getSystemException());
stats.incrementSystemExceptions(result.getSystemException());
throw result.getSystemException();
} else {
@@ -308,35 +322,47 @@ private void processResult(InputMessage msg,
throw ex;
}
if (output != null) {
- sendOutputMessage(msg, output);
+ sendOutputMessage(srcRecord, output);
} else {
- processor.sendOutputMessage(msg, null);
+ processor.sendOutputMessage(srcRecord, null);
}
} else {
// the function doesn't produce any result or the user doesn't
want the result.
- processor.sendOutputMessage(msg, null);
+ processor.sendOutputMessage(srcRecord, null);
}
}
}
- private void sendOutputMessage(InputMessage srcMsg,
+ private void sendOutputMessage(Record srcRecord,
byte[] output) throws Exception {
- MessageBuilder msgBuilder = MessageBuilder.create()
- .setContent(output)
- .setProperty("__pfn_input_topic__", srcMsg.getTopicName())
- .setProperty("__pfn_input_msg_id__", new
String(Base64.getEncoder().encode(srcMsg.getActualMessage().getMessageId().toByteArray())));
- processor.sendOutputMessage(srcMsg, msgBuilder);
+ MessageBuilder msgBuilder = MessageBuilder.create();
+ if (srcRecord instanceof PulsarRecord) {
+ PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
+ msgBuilder
+ .setContent(output)
+ .setProperty("__pfn_input_topic__",
pulsarMessage.getTopicName())
+ .setProperty("__pfn_input_msg_id__", new
String(Base64.getEncoder().encode(pulsarMessage.getMessageId().toByteArray())));
+ }
+
+ processor.sendOutputMessage(srcRecord, msgBuilder);
+ }
+
+ /**
+ * Stop java instance runnable
+ */
+ public void stop() {
+ this.running.set(false);
}
@Override
public void close() {
- if (!running) {
+ if (!running.get()) {
return;
}
- running = false;
processor.close();
+ javaInstance.close();
// kill the state table
if (null != stateTable) {
@@ -401,39 +427,12 @@ private static void addSystemMetrics(String metricName,
double value, InstanceCo
bldr.putMetrics(metricName, digest);
}
- private static SerDe initializeSerDe(String serdeClassName, ClassLoader
clsLoader,
- Class<?>[] typeArgs, boolean
inputArgs) {
- if (null == serdeClassName || serdeClassName.isEmpty()) {
- return null;
- } else if (serdeClassName.equals(DefaultSerDe.class.getName())) {
- return initializeDefaultSerDe(typeArgs, inputArgs);
- } else {
- return Reflections.createInstance(
- serdeClassName,
- SerDe.class,
- clsLoader);
- }
- }
-
- private static SerDe initializeDefaultSerDe(Class<?>[] typeArgs, boolean
inputArgs) {
- if (inputArgs) {
- if (!DefaultSerDe.IsSupportedType(typeArgs[0])) {
- throw new RuntimeException("Default Serializer does not
support " + typeArgs[0]);
- }
- return new DefaultSerDe(typeArgs[0]);
- } else {
- if (!DefaultSerDe.IsSupportedType(typeArgs[1])) {
- throw new RuntimeException("Default Serializer does not
support " + typeArgs[1]);
- }
- return new DefaultSerDe(typeArgs[1]);
- }
- }
-
private void setupSerDe(Class<?>[] typeArgs, ClassLoader clsLoader) {
+
this.inputSerDe = new HashMap<>();
-
instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) ->
this.inputSerDe.put(k, initializeSerDe(v, clsLoader, typeArgs, true)));
+
instanceConfig.getFunctionDetails().getCustomSerdeInputsMap().forEach((k, v) ->
this.inputSerDe.put(k, InstanceUtils.initializeSerDe(v, clsLoader,
typeArgs[0])));
for (String topicName :
instanceConfig.getFunctionDetails().getInputsList()) {
- this.inputSerDe.put(topicName, initializeDefaultSerDe(typeArgs,
true));
+ this.inputSerDe.put(topicName,
InstanceUtils.initializeDefaultSerDe(typeArgs[0]));
}
if (Void.class.equals(typeArgs[0])) {
@@ -458,9 +457,9 @@ private void setupSerDe(Class<?>[] typeArgs, ClassLoader
clsLoader) {
if (instanceConfig.getFunctionDetails().getOutputSerdeClassName()
== null
||
instanceConfig.getFunctionDetails().getOutputSerdeClassName().isEmpty()
||
instanceConfig.getFunctionDetails().getOutputSerdeClassName().equals(DefaultSerDe.class.getName()))
{
- outputSerDe = initializeDefaultSerDe(typeArgs, false);
+ outputSerDe =
InstanceUtils.initializeDefaultSerDe(typeArgs[1]);
} else {
- this.outputSerDe =
initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(),
clsLoader, typeArgs, false);
+ this.outputSerDe =
InstanceUtils.initializeSerDe(instanceConfig.getFunctionDetails().getOutputSerdeClassName(),
clsLoader, typeArgs[1]);
}
Class<?>[] outputSerdeTypeArgs =
TypeResolver.resolveRawArguments(SerDe.class, outputSerDe.getClass());
if
(outputSerDe.getClass().getName().equals(DefaultSerDe.class.getName())) {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
similarity index 58%
rename from
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
rename to
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
index bd7e788836..c812a77aae 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InputMessage.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarConfig.java
@@ -18,39 +18,25 @@
*/
package org.apache.pulsar.functions.instance;
+import lombok.Builder;
+import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.functions.api.SerDe;
+import org.apache.pulsar.functions.proto.Function;
+
+import java.util.Map;
@Getter
@Setter
+@Data
+@Builder
@ToString
-public class InputMessage {
-
- private Message actualMessage;
- String topicName;
- SerDe inputSerDe;
- Consumer consumer;
-
- public int getTopicPartition() {
- MessageIdImpl msgId = (MessageIdImpl) actualMessage.getMessageId();
- return msgId.getPartitionIndex();
- }
-
- public void ack() {
- if (null != consumer) {
- consumer.acknowledgeAsync(actualMessage);
- }
- }
-
- public void ackCumulative() {
- if (null != consumer) {
- consumer.acknowledgeCumulativeAsync(actualMessage);
- }
- }
-
+public class PulsarConfig {
+ private Function.FunctionDetails.ProcessingGuarantees processingGuarantees;
+ private SubscriptionType subscriptionType;
+ private String subscription;
+ private Map<String, SerDe> topicToSerdeMap;
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
new file mode 100644
index 0000000000..2bdbdb1f97
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarRecord.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.connect.core.Record;
+
+@Data
+@Builder
+@Getter
+@ToString
+@EqualsAndHashCode
+public class PulsarRecord<T> implements Record<T> {
+
+ private String partitionId;
+ private Long sequenceId;
+ private T value;
+ private MessageId messageId;
+ private String topicName;
+ private Runnable failFunction;
+ private Runnable ackFunction;
+
+ @Override
+ public void ack() {
+ this.ackFunction.run();
+ }
+
+ @Override
+ public void fail() {
+ this.failFunction.run();
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
new file mode 100644
index 0000000000..43d9350922
--- /dev/null
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/PulsarSource.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.instance;
+
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageIdImpl;
+import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
+import org.apache.pulsar.functions.proto.Function;
+
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class PulsarSource<T> implements Source<T> {
+
+ private PulsarClient pulsarClient;
+ private PulsarConfig pulsarConfig;
+
+ @Getter
+ private org.apache.pulsar.client.api.Consumer inputConsumer;
+
+ public PulsarSource(PulsarClient pulsarClient, PulsarConfig pulsarConfig) {
+ this.pulsarClient = pulsarClient;
+ this.pulsarConfig = pulsarConfig;
+ }
+
+ @Override
+ public void open(Map<String, Object> config) throws Exception {
+ this.inputConsumer = this.pulsarClient.newConsumer()
+ .topics(new
ArrayList<>(this.pulsarConfig.getTopicToSerdeMap().keySet()))
+ .subscriptionName(this.pulsarConfig.getSubscription())
+ .subscriptionType(this.pulsarConfig.getSubscriptionType())
+ .ackTimeout(1, TimeUnit.MINUTES)
+ .subscribe();
+ }
+
+ @Override
+ public Record<T> read() throws Exception {
+ org.apache.pulsar.client.api.Message<T> message =
this.inputConsumer.receive();
+
+ String topicName;
+ String partitionId;
+
+ // If more than one topics are being read than the Message return by
the consumer will be TopicMessageImpl
+ // If there is only topic being read then the Message returned by the
consumer wil be MessageImpl
+ if (message instanceof TopicMessageImpl) {
+ topicName = ((TopicMessageImpl) message).getTopicName();
+ TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)
message.getMessageId();
+ MessageIdImpl messageId = (MessageIdImpl)
topicMessageId.getInnerMessageId();
+ partitionId = Long.toString(messageId.getPartitionIndex());
+ } else {
+ topicName =
this.pulsarConfig.getTopicToSerdeMap().keySet().iterator().next();
+ partitionId = Long.toString(((MessageIdImpl)
message.getMessageId()).getPartitionIndex());
+ }
+
+ Object object;
+ try {
+ object =
this.pulsarConfig.getTopicToSerdeMap().get(topicName).deserialize(message.getData());
+ } catch (Exception e) {
+ //TODO Add deserialization exception stats
+ throw new RuntimeException("Error occured when attempting to
deserialize input:", e);
+ }
+
+ T input;
+ try {
+ input = (T) object;
+ } catch (ClassCastException e) {
+ throw new RuntimeException("Error in casting input to expected
type:", e);
+ }
+
+ PulsarRecord<T> pulsarMessage = (PulsarRecord<T>)
PulsarRecord.builder()
+ .value(input)
+ .messageId(message.getMessageId())
+ .partitionId(partitionId)
+ .sequenceId(message.getSequenceId())
+ .topicName(topicName)
+ .ackFunction(() -> {
+ if (pulsarConfig.getProcessingGuarantees()
+ ==
Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ inputConsumer.acknowledgeCumulativeAsync(message);
+ } else {
+ inputConsumer.acknowledgeAsync(message);
+ }
+ }).failFunction(() -> {
+ if (pulsarConfig.getProcessingGuarantees()
+ ==
Function.FunctionDetails.ProcessingGuarantees.EFFECTIVELY_ONCE) {
+ throw new RuntimeException("Failed to process message:
" + message.getMessageId());
+ }
+ })
+ .build();
+ return pulsarMessage;
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.inputConsumer.close();
+ }
+}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
index 86aeb83efa..465d1989d5 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtLeastOnceProcessor.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.instance.processors;
-import java.util.concurrent.LinkedBlockingDeque;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
@@ -27,7 +26,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.connect.core.Record;
import
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -52,15 +51,17 @@ protected void initializeOutputProducer(String outputTopic)
throws Exception {
}
@Override
- public void sendOutputMessage(InputMessage inputMsg,
MessageBuilder<byte[]> outputMsgBuilder) {
+ public void sendOutputMessage(Record srcRecord, MessageBuilder
outputMsgBuilder) {
if (null == outputMsgBuilder || null == producer) {
- inputMsg.ack();
+ srcRecord.ack();
return;
}
Message<byte[]> outputMsg = outputMsgBuilder.build();
producer.sendAsync(outputMsg)
- .thenAccept(msgId -> inputMsg.ack());
+ .thenAccept(msgId -> {
+ srcRecord.ack();
+ });
}
@Override
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
index 994be0d03a..08cf11170b 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/AtMostOnceProcessor.java
@@ -18,7 +18,6 @@
*/
package org.apache.pulsar.functions.instance.processors;
-import java.util.concurrent.LinkedBlockingDeque;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
@@ -26,7 +25,7 @@
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.connect.core.Record;
import
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -45,10 +44,10 @@
}
@Override
- protected void postReceiveMessage(InputMessage message) {
- super.postReceiveMessage(message);
+ public void postReceiveMessage(Record record) {
+ super.postReceiveMessage(record);
if (functionDetails.getAutoAck()) {
- message.ack();
+ record.ack();
}
}
@@ -58,7 +57,7 @@ protected void initializeOutputProducer(String outputTopic)
throws Exception {
}
@Override
- public void sendOutputMessage(InputMessage inputMsg,
MessageBuilder<byte[]> outputMsgBuilder) {
+ public void sendOutputMessage(Record srcRecord, MessageBuilder
outputMsgBuilder) {
if (null == outputMsgBuilder) {
return;
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
index 59c7bd6f3f..aafca69534 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/EffectivelyOnceProcessor.java
@@ -18,31 +18,22 @@
*/
package org.apache.pulsar.functions.instance.processors;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
-import
org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.functions.instance.PulsarRecord;
import
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
import org.apache.pulsar.functions.instance.producers.Producers;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
-import org.apache.pulsar.functions.utils.Utils;
/**
* A message processor that process messages effectively-once.
@@ -50,8 +41,6 @@
@Slf4j
class EffectivelyOnceProcessor extends MessageProcessorBase implements
ConsumerEventListener {
- private LinkedList<String> inputTopicsToResubscribe = null;
-
@Getter(AccessLevel.PACKAGE)
protected Producers outputProducer;
@@ -103,24 +92,28 @@ protected void initializeOutputProducer(String
outputTopic) throws Exception {
//
@Override
- public void sendOutputMessage(InputMessage inputMsg,
- MessageBuilder<byte[]> outputMsgBuilder)
throws Exception {
+ public void sendOutputMessage(Record srcRecord,
+ MessageBuilder outputMsgBuilder) throws
Exception {
if (null == outputMsgBuilder) {
- inputMsg.ackCumulative();
+ srcRecord.ack();
return;
}
// assign sequence id to output message for idempotent producing
outputMsgBuilder = outputMsgBuilder
-
.setSequenceId(Utils.getSequenceId(inputMsg.getActualMessage().getMessageId()));
-
-
- Producer<byte[]> producer =
outputProducer.getProducer(inputMsg.getTopicName(),
inputMsg.getTopicPartition());
-
- Message<byte[]> outputMsg = outputMsgBuilder.build();
- producer.sendAsync(outputMsg)
- .thenAccept(messageId -> inputMsg.ackCumulative())
- .join();
+ .setSequenceId(srcRecord.getRecordSequence());
+
+ // currently on PulsarRecord
+ if (srcRecord instanceof PulsarRecord) {
+ PulsarRecord pulsarMessage = (PulsarRecord) srcRecord;
+ Producer producer =
outputProducer.getProducer(pulsarMessage.getTopicName(),
+ Integer.parseInt(srcRecord.getPartitionId()));
+
+ org.apache.pulsar.client.api.Message outputMsg =
outputMsgBuilder.build();
+ producer.sendAsync(outputMsg)
+ .thenAccept(messageId -> srcRecord.ack())
+ .join();
+ }
}
@Override
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
index 97c971d0a1..fd22adb261 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessor.java
@@ -19,16 +19,15 @@
package org.apache.pulsar.functions.instance.processors;
import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
+
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.InputMessage;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import
org.apache.pulsar.functions.proto.Function.FunctionDetails.ProcessingGuarantees;
@@ -68,6 +67,8 @@ static MessageProcessor create(PulsarClient client,
}
}
+ void postReceiveMessage(Record record);
+
/**
* Setup the input with a provided <i>processQueue</i>. The implementation
of this processor is responsible for
* setting up the input and passing the received messages from input to
the provided <i>processQueue</i>.
@@ -78,11 +79,11 @@ void setupInput(Map<String, SerDe> inputSerDe)
throws Exception;
/**
- * Return the input.
+ * Return the source.
*
- * @return the input consumer.
+ * @return the source.
*/
- Consumer<byte[]> getInputConsumer();
+ Source getSource();
/**
* Setup the output with a provided <i>outputSerDe</i>. The implementation
of this processor is responsible for
@@ -99,18 +100,18 @@ void setupInput(Map<String, SerDe> inputSerDe)
* <p>If the <i>outputMsgBuilder</i> is null, the implementation doesn't
have to send any messages to the output.
* The implementation can decide to acknowledge the input message based on
its process guarantees.
*
- * @param inputMsg input message
+ * @param srcRecord record from source
* @param outputMsgBuilder output message builder. it can be null.
*/
- void sendOutputMessage(InputMessage inputMsg,
- MessageBuilder<byte[]> outputMsgBuilder) throws
PulsarClientException, Exception;
+ void sendOutputMessage(Record srcRecord,
+ MessageBuilder outputMsgBuilder) throws
PulsarClientException, Exception;
/**
* Get the next message to process
* @return the next input message
* @throws Exception
*/
- InputMessage recieveMessage() throws Exception;
+ Record recieveMessage() throws Exception;
@Override
void close();
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
index fc37b134ef..06bc175730 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/processors/MessageProcessorBase.java
@@ -25,18 +25,17 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.Message;
+import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
-import org.apache.pulsar.client.impl.TopicMessageImpl;
+import org.apache.pulsar.connect.core.Record;
+import org.apache.pulsar.connect.core.Source;
import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.InputMessage;
+import org.apache.pulsar.functions.instance.PulsarConfig;
+import org.apache.pulsar.functions.instance.PulsarSource;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
+import org.apache.pulsar.functions.utils.Reflections;
/**
* The base implementation of {@link MessageProcessor}.
@@ -48,12 +47,8 @@
protected final FunctionDetails functionDetails;
protected final SubscriptionType subType;
- protected Map<String, SerDe> inputSerDe;
-
- protected SerDe outputSerDe;
-
@Getter
- protected Consumer<byte[]> inputConsumer;
+ protected Source source;
protected List<String> topics;
@@ -64,6 +59,8 @@ protected MessageProcessorBase(PulsarClient client,
this.functionDetails = functionDetails;
this.subType = subType;
this.topics = new LinkedList<>();
+ this.topics.addAll(this.functionDetails.getInputsList());
+
this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet());
}
//
@@ -72,36 +69,52 @@ protected MessageProcessorBase(PulsarClient client,
@Override
public void setupInput(Map<String, SerDe> inputSerDe) throws Exception {
- log.info("Setting up input with input serdes: {}", inputSerDe);
- this.inputSerDe = inputSerDe;
-
this.topics.addAll(this.functionDetails.getCustomSerdeInputsMap().keySet());
- this.topics.addAll(this.functionDetails.getInputsList());
- this.inputConsumer = this.client.newConsumer()
- .topics(this.topics)
-
.subscriptionName(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails))
- .subscriptionType(getSubscriptionType())
- .subscribe();
+ org.apache.pulsar.functions.proto.Function.ConnectorDetails
connectorDetails = this.functionDetails.getSource();
+ Object object;
+ if
(connectorDetails.getClassName().equals(PulsarSource.class.getName())) {
+ PulsarConfig pulsarConfig = PulsarConfig.builder()
+ .topicToSerdeMap(inputSerDe)
+
.subscription(FunctionDetailsUtils.getFullyQualifiedName(this.functionDetails))
+
.processingGuarantees(this.functionDetails.getProcessingGuarantees())
+ .subscriptionType(this.subType)
+ .build();
+ Object[] params = {this.client, pulsarConfig};
+ Class[] paramTypes = {PulsarClient.class, PulsarConfig.class};
+ object = Reflections.createInstance(
+ connectorDetails.getClassName(),
+ PulsarSource.class.getClassLoader(), params, paramTypes);
+
+ } else {
+ object = Reflections.createInstance(
+ connectorDetails.getClassName(),
+ Thread.currentThread().getContextClassLoader());
+ }
+
+ Class<?>[] typeArgs;
+ if (object instanceof Source) {
+ typeArgs = TypeResolver.resolveRawArguments(Source.class,
object.getClass());
+ assert typeArgs.length > 0;
+ } else {
+ throw new RuntimeException("Source does not implement correct
interface");
+ }
+ this.source = (Source) object;
+
+ try {
+ this.source.open(connectorDetails.getConfigsMap());
+ } catch (Exception e) {
+ log.info("Error occurred executing open for source: {}",
+ this.functionDetails.getSource().getClassName(), e);
+ }
+
}
protected SubscriptionType getSubscriptionType() {
return subType;
}
- public InputMessage recieveMessage() throws PulsarClientException {
- Message message = this.inputConsumer.receive();
- String topicName;
- if (message instanceof TopicMessageImpl) {
- topicName = ((TopicMessageImpl)message).getTopicName();
- } else {
- topicName = this.topics.get(0);
- }
- InputMessage inputMessage = new InputMessage();
- inputMessage.setConsumer(inputConsumer);
- inputMessage.setInputSerDe(inputSerDe.get(topicName));
- inputMessage.setActualMessage(message);
- inputMessage.setTopicName(topicName);
- return inputMessage;
+ public Record recieveMessage() throws Exception {
+ return this.source.read();
}
/**
@@ -110,9 +123,10 @@ public InputMessage recieveMessage() throws
PulsarClientException {
* <p>The processor implementation can make a decision to process the
message based on its processing guarantees.
* for example, an at-most-once processor can ack the message immediately.
*
- * @param message input message.
+ * @param record input message.
*/
- protected void postReceiveMessage(InputMessage message) {}
+ @Override
+ public void postReceiveMessage(Record record) {}
//
// Output
@@ -120,8 +134,6 @@ protected void postReceiveMessage(InputMessage message) {}
@Override
public void setupOutput(SerDe outputSerDe) throws Exception {
- this.outputSerDe = outputSerDe;
-
String outputTopic = functionDetails.getOutput();
if (outputTopic != null
&& !functionDetails.getOutput().isEmpty()
@@ -141,10 +153,9 @@ public void setupOutput(SerDe outputSerDe) throws
Exception {
public void close() {
try {
- this.inputConsumer.close();
- } catch (PulsarClientException e) {
- log.warn("Failed to close consumer to input topics {}",
- ((MultiTopicsConsumerImpl)
this.inputConsumer).getTopics(), e);
+ this.source.close();
+ } catch (Exception e) {
+ log.warn("Failed to close source {}", this.source, e);
}
}
}
diff --git
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index b9e762a583..c0aae7c024 100644
---
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.functions.instance;
+import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
@@ -50,7 +51,7 @@ public void testLambda() {
JavaInstance instance = new JavaInstance(
config,
(Function<String, String>) (input, context) -> input + "-lambda",
- null, null, null);
+ null, null, mock(PulsarSource.class));
String testString = "ABC123";
JavaExecutionResult result =
instance.handleMessage(MessageId.earliest, "random", testString);
assertNotNull(result.getResult());
diff --git a/pulsar-functions/proto/src/main/proto/Function.proto
b/pulsar-functions/proto/src/main/proto/Function.proto
index c9b3da2831..1aeb21adbc 100644
--- a/pulsar-functions/proto/src/main/proto/Function.proto
+++ b/pulsar-functions/proto/src/main/proto/Function.proto
@@ -53,6 +53,12 @@ message FunctionDetails {
bool autoAck = 13;
repeated string inputs = 14;
int32 parallelism = 15;
+ ConnectorDetails source = 16;
+}
+
+message ConnectorDetails {
+ string className = 1;
+ map<string, string> configs = 4;
}
message PackageLocationMetaData {
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 88f1ac76ee..4b14dad5c1 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -30,6 +30,7 @@
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function.ConnectorDetails;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
@@ -106,6 +107,13 @@
@Parameter(names = "--subscription_type", description = "What subscription
type to use")
protected FunctionDetails.SubscriptionType subscriptionType;
+ @Parameter(names = "--source_classname", description = "The source
classname")
+ protected String sourceClassname;
+
+ @Parameter(names = "--source_configs", description = "The source
classname")
+ protected String sourceConfigs;
+
+
private Server server;
public JavaInstanceMain() { }
@@ -159,6 +167,16 @@ public void start() throws Exception {
Map<String, String> userConfigMap = new
Gson().fromJson(userConfig, type);
functionDetailsBuilder.putAllUserConfig(userConfigMap);
}
+
+ ConnectorDetails.Builder sourceDetailsBuilder =
ConnectorDetails.newBuilder();
+ sourceDetailsBuilder.setClassName(sourceClassname);
+ if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
+ Type type = new TypeToken<Map<String, String>>(){}.getType();
+ Map<String, String> sourceConfigMap = new
Gson().fromJson(sourceConfigs, type);
+ sourceDetailsBuilder.putAllConfigs(sourceConfigMap);
+ }
+ functionDetailsBuilder.setSource(sourceDetailsBuilder);
+
FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index def06cfea2..8da2b9d1de 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -175,6 +175,13 @@
instancePort = findAvailablePort();
args.add("--port");
args.add(String.valueOf(instancePort));
+ args.add("--source_classname");
+
args.add(instanceConfig.getFunctionDetails().getSource().getClassName());
+ Map<String, String> sourceConfigs =
instanceConfig.getFunctionDetails().getSource().getConfigsMap();
+ if (sourceConfigs != null && !sourceConfigs.isEmpty()) {
+ args.add("--source_config");
+ args.add(new Gson().toJson(sourceConfigs));
+ }
return args;
}
diff --git
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 830e18a9f1..136951eaa0 100644
---
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -79,7 +79,15 @@ public void start() {
@Override
public void uncaughtException(Thread t, Throwable e) {
startupException = new Exception(e);
+ log.error("Error occured in java instance:", e);
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e1) {
+ //ignore
+ }
+ // restart
start();
+
}
});
this.fnThread.start();
@@ -95,9 +103,10 @@ public void join() throws Exception {
@Override
public void stop() {
if (fnThread != null) {
+ // Stop instance thread
+ javaInstanceRunnable.stop();
// interrupt the instance thread
fnThread.interrupt();
- javaInstanceRunnable.close();
try {
fnThread.join();
} catch (InterruptedException e) {
diff --git
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 7042c266db..67ac82f2b0 100644
---
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -21,6 +21,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.runtime.ProcessRuntime;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
@@ -80,6 +81,8 @@ FunctionDetails createFunctionDetails(FunctionDetails.Runtime
runtime) {
functionDetailsBuilder.setOutput(TEST_NAME + "-output");
functionDetailsBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer");
functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
+ functionDetailsBuilder.setSource(Function.ConnectorDetails.newBuilder()
+ .setClassName("org.pulsar.pulsar.TestSource"));
return functionDetailsBuilder.build();
}
@@ -101,8 +104,7 @@ public void testJavaConstructor() {
ProcessRuntime container = factory.createContainer(config,
userJarFile);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 43);
- args.remove(args.size() - 1);
+ assertEquals(args.size(), 45);
String expectedArgs = "java -cp " + javaInstanceJarFile + "
-Dlog4j.configurationFile=java_instance_log4j2.yml "
+ "-Dpulsar.log.dir=" + logDirectory + "/functions" + "
-Dpulsar.log.file=" + config.getFunctionDetails().getName()
+ " org.apache.pulsar.functions.runtime.JavaInstanceMain"
@@ -120,7 +122,8 @@ public void testJavaConstructor() {
+ " --output_serde_classname " +
config.getFunctionDetails().getOutputSerdeClassName()
+ " --processing_guarantees ATLEAST_ONCE"
+ " --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port";
+ + " --max_buffered_tuples 1024 --port " + args.get(42)
+ + " --source_classname " +
config.getFunctionDetails().getSource().getClassName();
assertEquals(expectedArgs, String.join(" ", args));
}
@@ -130,8 +133,7 @@ public void testPythonConstructor() {
ProcessRuntime container = factory.createContainer(config,
userJarFile);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 42);
- args.remove(args.size() - 1);
+ assertEquals(args.size(), 44);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + userJarFile + " --logging_directory "
+ logDirectory + "/functions" + " --logging_file " +
config.getFunctionDetails().getName() + " --instance_id "
@@ -148,7 +150,8 @@ public void testPythonConstructor() {
+ " --output_serde_classname " +
config.getFunctionDetails().getOutputSerdeClassName()
+ " --processing_guarantees ATLEAST_ONCE"
+ " --pulsar_serviceurl " + pulsarServiceUrl
- + " --max_buffered_tuples 1024 --port";
+ + " --max_buffered_tuples 1024 --port " + args.get(41)
+ + " --source_classname " +
config.getFunctionDetails().getSource().getClassName();
assertEquals(expectedArgs, String.join(" ", args));
}
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
index 77060302f0..5ecdd4f347 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionDetailsUtils.java
@@ -43,17 +43,6 @@ public static String extractFunctionNameFromFQN(String
fullyQualifiedName) {
return fullyQualifiedName.split("/")[2];
}
- public static boolean areAllRequiredFieldsPresent(FunctionDetails
FunctionDetails) {
- if (FunctionDetails.getTenant() == null ||
FunctionDetails.getNamespace() == null
- || FunctionDetails.getName() == null ||
FunctionDetails.getClassName() == null
- || (FunctionDetails.getInputsCount() <= 0 &&
FunctionDetails.getCustomSerdeInputsCount() <= 0)
- || FunctionDetails.getParallelism() <= 0) {
- return false;
- } else {
- return true;
- }
- }
-
public static String getDownloadFileName(FunctionDetails FunctionDetails) {
String[] hierarchy = FunctionDetails.getClassName().split("\\.");
String fileName;
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 890195cafb..64418bbb78 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -116,6 +116,40 @@ public static Object createInstance(String userClassName,
}
+ public static Object createInstance(String userClassName,
+ ClassLoader classLoader, Object[]
params, Class[] paramTypes) {
+ if (params.length != paramTypes.length) {
+ throw new RuntimeException(
+ "Unequal number of params and paramTypes. Each param must
have a correspoinding param type");
+ }
+ Class<?> theCls;
+ try {
+ theCls = Class.forName(userClassName, true, classLoader);
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("User class must be in class path",
cnfe);
+ }
+ Object result;
+ try {
+ Constructor<?> meth = constructorCache.get(theCls);
+ if (null == meth) {
+ meth = theCls.getDeclaredConstructor(paramTypes);
+ meth.setAccessible(true);
+ constructorCache.put(theCls, meth);
+ }
+ result = meth.newInstance(params);
+ } catch (InstantiationException ie) {
+ throw new RuntimeException("User class must be concrete", ie);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("User class doesn't have such method",
e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("User class must have a no-arg
constructor", e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("User class constructor throws
exception", e);
+ }
+ return result;
+
+ }
+
public static Object createInstance(String userClassName, java.io.File
jar) {
try {
return createInstance(userClassName, loadJar(jar));
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services