This is an automated email from the ASF dual-hosted git repository.

riemer pushed a commit to branch 
3283-opc-ua-adapter-should-send-incomplete-events-in-case-of-bad-status-code
in repository https://gitbox.apache.org/repos/asf/streampipes.git


The following commit(s) were added to 
refs/heads/3283-opc-ua-adapter-should-send-incomplete-events-in-case-of-bad-status-code
 by this push:
     new 94be920869 feat(#3283): Allow incomplete events in OPC-UA adapter
94be920869 is described below

commit 94be92086999139ba96ad14d8e5cb9aabf7cbd4a
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri Oct 4 11:19:29 2024 +0200

    feat(#3283): Allow incomplete events in OPC-UA adapter
---
 .../opcua/OpcUaConnectorsModuleExport.java         |  4 +-
 .../connectors/opcua/adapter/OpcUaAdapter.java     | 17 ++++--
 .../opcua/config/OpcUaAdapterConfig.java           |  9 +++
 .../opcua/config/SharedUserConfiguration.java      | 31 ++++++++++
 .../opcua/config/SpOpcUaConfigExtractor.java       |  4 ++
 .../opcua/migration/OpcUaAdapterMigrationV2.java   |  3 +-
 .../opcua/migration/OpcUaAdapterMigrationV3.java   | 68 ++++++++++++++++++++++
 .../strings.en                                     |  3 +
 .../runtime/StandaloneEventProcessorRuntime.java   |  3 +
 9 files changed, 135 insertions(+), 7 deletions(-)

diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
index d551d979d0..63aacdab58 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
@@ -25,6 +25,7 @@ import 
org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
 import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
 import 
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV1;
 import 
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV2;
+import 
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV3;
 import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
 
 import java.util.List;
@@ -48,7 +49,8 @@ public class OpcUaConnectorsModuleExport implements 
IExtensionModuleExport {
   public List<IModelMigrator<?, ?>> migrators() {
     return List.of(
         new OpcUaAdapterMigrationV1(),
-        new OpcUaAdapterMigrationV2()
+        new OpcUaAdapterMigrationV2(),
+        new OpcUaAdapterMigrationV3()
     );
   }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
index 5978f5ad24..baf5cfe95b 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
@@ -74,6 +74,7 @@ import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil
 public class OpcUaAdapter implements StreamPipesAdapter, IPullAdapter, 
SupportsRuntimeConfig {
 
   public static final String ID = 
"org.apache.streampipes.connect.iiot.adapters.opcua";
+  public static final String PULL_GROUP = "pull-mode-group";
   private static final Logger LOG = 
LoggerFactory.getLogger(OpcUaAdapter.class);
 
   private int pullingIntervalMilliSeconds;
@@ -154,17 +155,23 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
           this.event.put(this.allNodes.get(i).getLabel(), value);
         } else {
           badStatusCodeReceived = true;
-          LOG.warn("Received status code {} for node label: {} - event will 
not be sent",
+          LOG.warn("Received status code {} for node label: {}",
               status,
               this.allNodes.get(i).getLabel());
         }
       }
     }
-    if (!badStatusCodeReceived && !emptyValueReceived) {
+    if (!emptyValueReceived && !shouldSkipEvent(badStatusCodeReceived)) {
       collector.collect(this.event);
     }
   }
 
+  private boolean shouldSkipEvent(boolean badStatusCodeReceived) {
+    return badStatusCodeReceived
+        && this.spOpcUaClient.getSpOpcConfig().getIncompleteEventStrategy()
+        .equalsIgnoreCase(SharedUserConfiguration.INCOMPLETE_OPTION_IGNORE);
+  }
+
   public void onSubscriptionValue(UaMonitoredItem item,
                                   DataValue value) {
 
@@ -231,14 +238,14 @@ public class OpcUaAdapter implements StreamPipesAdapter, 
IPullAdapter, SupportsR
 
   @Override
   public IAdapterConfiguration declareConfig() {
-    var builder = AdapterConfigurationBuilder.create(ID, 2, OpcUaAdapter::new)
+    var builder = AdapterConfigurationBuilder.create(ID, 3, OpcUaAdapter::new)
         .withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
         .withLocales(Locales.EN)
         .withCategory(AdapterType.Generic, AdapterType.Manufacturing)
         .requiredAlternatives(Labels.withId(ADAPTER_TYPE),
             Alternatives.from(Labels.withId(PULL_MODE),
-                StaticProperties.integerFreeTextProperty(
-                    Labels.withId(PULLING_INTERVAL))),
+                SharedUserConfiguration.getPullModeGroup()
+            ),
             Alternatives.from(Labels.withId(SUBSCRIPTION_MODE)));
     SharedUserConfiguration.appendSharedOpcUaConfig(builder, true);
     return builder.buildConfiguration();
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaAdapterConfig.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaAdapterConfig.java
index c6736ff8c3..e2758c8c03 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaAdapterConfig.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaAdapterConfig.java
@@ -21,6 +21,7 @@ package 
org.apache.streampipes.extensions.connectors.opcua.config;
 public class OpcUaAdapterConfig extends OpcUaConfig {
 
   private Integer pullIntervalMilliSeconds;
+  private String incompleteEventStrategy;
 
   public Integer getPullIntervalMilliSeconds() {
     return pullIntervalMilliSeconds;
@@ -33,4 +34,12 @@ public class OpcUaAdapterConfig extends OpcUaConfig {
   public boolean inPullMode() {
     return pullIntervalMilliSeconds != null;
   }
+
+  public String getIncompleteEventStrategy() {
+    return incompleteEventStrategy;
+  }
+
+  public void setIncompleteEventStrategy(String incompleteEventStrategy) {
+    this.incompleteEventStrategy = incompleteEventStrategy;
+  }
 }
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
index cb95919174..9b5f541071 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
@@ -18,6 +18,9 @@
 
 package org.apache.streampipes.extensions.connectors.opcua.config;
 
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
 import org.apache.streampipes.sdk.StaticProperties;
 import 
org.apache.streampipes.sdk.builder.AbstractConfigurablePipelineElementBuilder;
 import org.apache.streampipes.sdk.helpers.Alternatives;
@@ -25,6 +28,7 @@ import org.apache.streampipes.sdk.helpers.Labels;
 
 import java.util.List;
 
+import static 
org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter.PULL_GROUP;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
@@ -36,12 +40,17 @@ import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabe
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_SERVER_URL;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_URL;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
+import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
 import static 
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
 
 public class SharedUserConfiguration {
 
+  public static final String INCOMPLETE_EVENT_HANDLING_KEY = 
"incomplete-event-handling";
+  public static final String INCOMPLETE_OPTION_IGNORE = "ignore-event";
+  public static final String INCOMPLETE_OPTION_SEND = "send-event";
+
   public static void 
appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBuilder<?, ?> 
builder,
                                              boolean adapterConfig) {
 
@@ -81,6 +90,28 @@ public class SharedUserConfiguration {
         );
   }
 
+  public static StaticPropertyGroup getPullModeGroup() {
+    var group = StaticProperties.group(
+        Labels.withId(PULL_GROUP),
+        false,
+        StaticProperties.integerFreeTextProperty(
+            Labels.withId(PULLING_INTERVAL)),
+        getIncompleteEventConfig()
+    );
+    group.setHorizontalRendering(false);
+    return group;
+  }
+
+  public static OneOfStaticProperty getIncompleteEventConfig() {
+    return StaticProperties.singleValueSelection(
+      Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
+        List.of(
+            new Option("Ignore (only complete messages are sent)", 
INCOMPLETE_OPTION_IGNORE),
+            new Option("Send (incomplete messages are sent)", 
INCOMPLETE_OPTION_SEND)
+        )
+    );
+  }
+
   public static List<String> getDependsOn(boolean adapterConfig) {
     return adapterConfig ? List.of(
         ADAPTER_TYPE.name(),
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
index 26cae21569..6741a3e6f8 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
@@ -55,8 +55,12 @@ public class SpOpcUaConfigExtractor {
     if (usePullMode) {
       Integer pullIntervalSeconds =
           extractor.singleValueParameter(PULLING_INTERVAL.name(), 
Integer.class);
+      var incompleteEventStrategy = extractor.selectedSingleValueInternalName(
+          SharedUserConfiguration.INCOMPLETE_EVENT_HANDLING_KEY, String.class
+      );
 
       config.setPullIntervalMilliSeconds(pullIntervalSeconds);
+      config.setIncompleteEventStrategy(incompleteEventStrategy);
     }
 
     return config;
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV2.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV2.java
index 382d1e3514..23d643acbb 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV2.java
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV2.java
@@ -20,6 +20,7 @@ package 
org.apache.streampipes.extensions.connectors.opcua.migration;
 
 import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
 import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
 import org.apache.streampipes.model.connect.adapter.AdapterDescription;
 import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
 import org.apache.streampipes.model.migration.MigrationResult;
@@ -38,7 +39,7 @@ public class OpcUaAdapterMigrationV2 implements 
IAdapterMigrator {
   @Override
   public ModelMigratorConfig config() {
     return new ModelMigratorConfig(
-        "org.apache.streampipes.connect.iiot.adapters.opcua",
+        OpcUaAdapter.ID,
         SpServiceTagPrefix.ADAPTER,
         1,
         2
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV3.java
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV3.java
new file mode 100644
index 0000000000..051dfe5196
--- /dev/null
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV3.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import 
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import 
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
+import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class OpcUaAdapterMigrationV3 implements IAdapterMigrator {
+  @Override
+  public ModelMigratorConfig config() {
+    return new ModelMigratorConfig(
+        OpcUaAdapter.ID,
+        SpServiceTagPrefix.ADAPTER,
+        2,
+        3
+    );
+  }
+
+  @Override
+  public MigrationResult<AdapterDescription> migrate(AdapterDescription 
element, IStaticPropertyExtractor extractor) throws RuntimeException {
+    var oneOfProperty = SharedUserConfiguration.getIncompleteEventConfig();
+    oneOfProperty.getOptions().get(0).setSelected(true);
+    element.getConfig().forEach(sp -> {
+      if 
(sp.getInternalName().equalsIgnoreCase(OpcUaLabels.ADAPTER_TYPE.name())) {
+        var alternatives = (StaticPropertyAlternatives) sp;
+        var pullAlternative = alternatives.getAlternatives().get(0);
+        var pullInterval = pullAlternative.getStaticProperty();
+        var group = StaticProperties.group(
+            Labels.withId(OpcUaAdapter.PULL_GROUP),
+            false,
+            pullInterval,
+            oneOfProperty
+        );
+        group.setHorizontalRendering(false);
+        pullAlternative.setStaticProperty(
+            group
+        );
+      }
+    });
+    return MigrationResult.success(element);
+  }
+}
diff --git 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
index c874137625..995045ec7a 100644
--- 
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
+++ 
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
@@ -67,3 +67,6 @@ PULL_MODE.description=
 
 SUBSCRIPTION_MODE.title=Subscription mode
 SUBSCRIPTION_MODE.description=
+
+incomplete-event-handling.title=Incomplete Events
+incomplete-event-handling.description=Select how events with missing values 
(e.g., due to bad status codes) are handled.
diff --git 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
index b9ce761f59..54d9a0d3e2 100644
--- 
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
+++ 
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
@@ -68,6 +68,9 @@ public class StandaloneEventProcessorRuntime extends 
StandalonePipelineElementRu
       var event = this.internalRuntimeParameters.makeEvent(runtimeParameters, 
rawEvent, sourceInfo);
       pipelineElement
           .onEvent(event, outputCollector);
+    } catch (IllegalArgumentException e) {
+      LOG.warn("A key could not be found - this can be due to an operation on 
a missing field.");
+      addLogEntry(e);
     } catch (RuntimeException e) {
       LOG.error("RuntimeException while processing event in {}", 
pipelineElement.getClass().getCanonicalName(), e);
       addLogEntry(e);

Reply via email to