This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch SP-1133
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to refs/heads/SP-1133 by this push:
     new 5ad116436 Minor improvements to code style (#1133)
5ad116436 is described below

commit 5ad11643625fe3f954adf2747b6ebe37a7b27238
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri Jan 20 20:42:17 2023 +0100

    Minor improvements to code style (#1133)
---
 .../standalone/function/StreamPipesFunction.java      | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
index 8f79526e2..eb0a49c27 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/function/StreamPipesFunction.java
@@ -65,12 +65,15 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
   @Override
   public void invokeRuntime(String serviceGroup) {
     var functionId = this.getFunctionConfig().getFunctionId();
-    this.outputCollectors = this.getOutputCollectors();
-    this.outputCollectors.forEach((key, value) -> value.connect());
 
-    FunctionContext context = new FunctionContextGenerator(
-        functionId.getId(), serviceGroup, this.requiredStreamIds(), 
this.outputCollectors)
-        .generate();
+    this.initializeProducers();
+
+    var context = new FunctionContextGenerator(
+        functionId.getId(),
+        serviceGroup,
+        this.requiredStreamIds(),
+        this.outputCollectors
+    ).generate();
 
     // Creates a source info for each incoming SpDataStream
     // The index is used to create the selector prefix for the SourceInfo
@@ -87,6 +90,7 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
     this.inputCollectors = getInputCollectors(context.getStreams());
 
     LOG.info("Invoking function {}:{}", functionId.getId(), 
functionId.getVersion());
+
     onServiceStarted(context);
     registerConsumers();
   }
@@ -134,6 +138,11 @@ public abstract class StreamPipesFunction implements 
IStreamPipesFunctionDeclare
         SpLogEntry.from(System.currentTimeMillis(), 
StreamPipesErrorMessage.from(e)));
   }
 
+  private void initializeProducers() {
+    this.outputCollectors = this.getOutputCollectors();
+    this.outputCollectors.forEach((key, value) -> value.connect());
+  }
+
   private Map<String, SpOutputCollector> getOutputCollectors() {
     this.getFunctionConfig().getOutputDataStreams().forEach((key, value) -> {
       this.outputCollectors.put(

Reply via email to