david-streamlio commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r618603122
##########
File path:
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -290,63 +318,63 @@ public void run() {
}
}
- private void setupStateStore() throws Exception {
- this.stateManager = new InstanceStateManager();
-
- if (null == stateStorageServiceUrl) {
- stateStoreProvider = StateStoreProvider.NULL;
- } else {
- stateStoreProvider = new BKStateStoreProviderImpl();
- Map<String, Object> stateStoreProviderConfig = new HashMap();
-
stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL,
stateStorageServiceUrl);
- stateStoreProvider.init(stateStoreProviderConfig,
instanceConfig.getFunctionDetails());
-
- StateStore store = stateStoreProvider.getStateStore(
- instanceConfig.getFunctionDetails().getTenant(),
- instanceConfig.getFunctionDetails().getNamespace(),
- instanceConfig.getFunctionDetails().getName()
- );
- StateStoreContext context = new StateStoreContextImpl();
- store.init(context);
-
- stateManager.registerStore(store);
- }
- }
-
- private void processResult(Record srcRecord,
- CompletableFuture<JavaExecutionResult> result)
throws Exception {
- result.whenComplete((result1, throwable) -> {
- if (throwable != null || result1.getUserException() != null) {
- Throwable t = throwable != null ? throwable :
result1.getUserException();
- log.warn("Encountered exception when processing message {}",
- srcRecord, t);
- stats.incrUserExceptions(t);
- srcRecord.fail();
- } else {
- if (result1.getResult() != null) {
- sendOutputMessage(srcRecord, result1.getResult());
- } else {
- if (instanceConfig.getFunctionDetails().getAutoAck()) {
- // the function doesn't produce any result or the user
doesn't want the result.
- srcRecord.ack();
- }
- }
- // increment total successfully processed
- stats.incrTotalProcessedSuccessfully();
+ @SuppressWarnings("rawtypes")
+ private void processResult(Record srcRecord, JavaExecutionResult
result) throws Exception {
+
+ if (result.getUserException() != null) {
+ log.warn("Encountered exception when processing message {}",
+ srcRecord, result.getUserException());
+ stats.incrUserExceptions(result.getUserException());
+ throw result.getUserException();
+ }
+
+ if (result.getSystemException() != null) {
+ log.warn("Encountered exception when processing message {}",
+ srcRecord, result.getSystemException());
+ stats.incrSysExceptions(result.getSystemException());
+ throw result.getSystemException();
+ }
+
+ if (result.getResult() == null) {
+ if (instanceConfig.getFunctionDetails().getAutoAck()) {
+ // the function doesn't produce any result or the user doesn't
want the result.
+ srcRecord.ack();
}
- });
+ } else {
+
+ try {
+ Object output = (result.getResult() instanceof
CompletableFuture) ?
+
((CompletableFuture)result.getResult()).get() : result.getResult();
+
+ sendOutputMessage(srcRecord, output);
+
+ if
(instanceConfig.getFunctionDetails().getAutoAck()) {
+ srcRecord.ack();
+ }
+ stats.incrTotalProcessedSuccessfully();
+
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Encountered exception when processing
message {}",
Review comment:
@eolivelli Just to clarify, you want to call
`Thread.currentThread().interrupt()` when we encounter an
`InterruptedException`? Why? and why only for that exception an not the
`ExecutionException` ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]