eolivelli commented on a change in pull request #10270:
URL: https://github.com/apache/pulsar/pull/10270#discussion_r618639963



##########
File path: 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
##########
@@ -290,63 +318,63 @@ public void run() {
         }
     }
 
-    private void setupStateStore() throws Exception {
-        this.stateManager = new InstanceStateManager();
-
-        if (null == stateStorageServiceUrl) {
-            stateStoreProvider = StateStoreProvider.NULL;
-        } else {
-            stateStoreProvider = new BKStateStoreProviderImpl();
-            Map<String, Object> stateStoreProviderConfig = new HashMap();
-            
stateStoreProviderConfig.put(BKStateStoreProviderImpl.STATE_STORAGE_SERVICE_URL,
 stateStorageServiceUrl);
-            stateStoreProvider.init(stateStoreProviderConfig, 
instanceConfig.getFunctionDetails());
-
-            StateStore store = stateStoreProvider.getStateStore(
-                instanceConfig.getFunctionDetails().getTenant(),
-                instanceConfig.getFunctionDetails().getNamespace(),
-                instanceConfig.getFunctionDetails().getName()
-            );
-            StateStoreContext context = new StateStoreContextImpl();
-            store.init(context);
-
-            stateManager.registerStore(store);
-        }
-    }
-
-    private void processResult(Record srcRecord,
-                               CompletableFuture<JavaExecutionResult> result) 
throws Exception {
-        result.whenComplete((result1, throwable) -> {
-            if (throwable != null || result1.getUserException() != null) {
-                Throwable t = throwable != null ? throwable : 
result1.getUserException();
-                log.warn("Encountered exception when processing message {}",
-                        srcRecord, t);
-                stats.incrUserExceptions(t);
-                srcRecord.fail();
-            } else {
-                if (result1.getResult() != null) {
-                    sendOutputMessage(srcRecord, result1.getResult());
-                } else {
-                    if (instanceConfig.getFunctionDetails().getAutoAck()) {
-                        // the function doesn't produce any result or the user 
doesn't want the result.
-                        srcRecord.ack();
-                    }
-                }
-                // increment total successfully processed
-                stats.incrTotalProcessedSuccessfully();
+    @SuppressWarnings("rawtypes")
+       private void processResult(Record srcRecord, JavaExecutionResult 
result) throws Exception {
+       
+       if (result.getUserException() != null) {
+               log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getUserException());
+            stats.incrUserExceptions(result.getUserException());
+            throw result.getUserException();
+       } 
+       
+       if (result.getSystemException() != null) {
+               log.warn("Encountered exception when processing message {}",
+                    srcRecord, result.getSystemException());
+            stats.incrSysExceptions(result.getSystemException());
+            throw result.getSystemException();
+       }
+       
+       if (result.getResult() == null) {
+               if (instanceConfig.getFunctionDetails().getAutoAck()) {
+                // the function doesn't produce any result or the user doesn't 
want the result.
+                srcRecord.ack();
             }
-        });
+       } else { 
+               
+               try {
+                               Object output = (result.getResult() instanceof 
CompletableFuture) ?
+                                       
((CompletableFuture)result.getResult()).get() : result.getResult();
+                               
+                               sendOutputMessage(srcRecord, output);
+                               
+                               if 
(instanceConfig.getFunctionDetails().getAutoAck()) {
+                                       srcRecord.ack();
+                               }
+                               stats.incrTotalProcessedSuccessfully();
+                               
+                       } catch (InterruptedException | ExecutionException e) {
+                               log.warn("Encountered exception when processing 
message {}",

Review comment:
       because it is the standard Java practice.
   when you catch a InterruptedException the "interrupted" flag is reset on the 
Thread and you have to set it again in order to let the information bubble up 
to the caller.
   
   > why only for that exception an not the ExecutionException
   because ExecutionException is not the special (and annoying) 
InterruptedException




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to