This is an automated email from the ASF dual-hosted git repository.
riemer pushed a commit to branch
3283-opc-ua-adapter-should-send-incomplete-events-in-case-of-bad-status-code
in repository https://gitbox.apache.org/repos/asf/streampipes.git
The following commit(s) were added to
refs/heads/3283-opc-ua-adapter-should-send-incomplete-events-in-case-of-bad-status-code
by this push:
new 94be920869 feat(#3283): Allow incomplete events in OPC-UA adapter
94be920869 is described below
commit 94be92086999139ba96ad14d8e5cb9aabf7cbd4a
Author: Dominik Riemer <[email protected]>
AuthorDate: Fri Oct 4 11:19:29 2024 +0200
feat(#3283): Allow incomplete events in OPC-UA adapter
---
.../opcua/OpcUaConnectorsModuleExport.java | 4 +-
.../connectors/opcua/adapter/OpcUaAdapter.java | 17 ++++--
.../opcua/config/OpcUaAdapterConfig.java | 9 +++
.../opcua/config/SharedUserConfiguration.java | 31 ++++++++++
.../opcua/config/SpOpcUaConfigExtractor.java | 4 ++
.../opcua/migration/OpcUaAdapterMigrationV2.java | 3 +-
.../opcua/migration/OpcUaAdapterMigrationV3.java | 68 ++++++++++++++++++++++
.../strings.en | 3 +
.../runtime/StandaloneEventProcessorRuntime.java | 3 +
9 files changed, 135 insertions(+), 7 deletions(-)
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
index d551d979d0..63aacdab58 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/OpcUaConnectorsModuleExport.java
@@ -25,6 +25,7 @@ import
org.apache.streampipes.extensions.api.pe.IStreamPipesPipelineElement;
import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
import
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV1;
import
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV2;
+import
org.apache.streampipes.extensions.connectors.opcua.migration.OpcUaAdapterMigrationV3;
import org.apache.streampipes.extensions.connectors.opcua.sink.OpcUaSink;
import java.util.List;
@@ -48,7 +49,8 @@ public class OpcUaConnectorsModuleExport implements
IExtensionModuleExport {
public List<IModelMigrator<?, ?>> migrators() {
return List.of(
new OpcUaAdapterMigrationV1(),
- new OpcUaAdapterMigrationV2()
+ new OpcUaAdapterMigrationV2(),
+ new OpcUaAdapterMigrationV3()
);
}
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
index 5978f5ad24..baf5cfe95b 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/adapter/OpcUaAdapter.java
@@ -74,6 +74,7 @@ import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaUtil
public class OpcUaAdapter implements StreamPipesAdapter, IPullAdapter,
SupportsRuntimeConfig {
public static final String ID =
"org.apache.streampipes.connect.iiot.adapters.opcua";
+ public static final String PULL_GROUP = "pull-mode-group";
private static final Logger LOG =
LoggerFactory.getLogger(OpcUaAdapter.class);
private int pullingIntervalMilliSeconds;
@@ -154,17 +155,23 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
this.event.put(this.allNodes.get(i).getLabel(), value);
} else {
badStatusCodeReceived = true;
- LOG.warn("Received status code {} for node label: {} - event will
not be sent",
+ LOG.warn("Received status code {} for node label: {}",
status,
this.allNodes.get(i).getLabel());
}
}
}
- if (!badStatusCodeReceived && !emptyValueReceived) {
+ if (!emptyValueReceived && !shouldSkipEvent(badStatusCodeReceived)) {
collector.collect(this.event);
}
}
+ private boolean shouldSkipEvent(boolean badStatusCodeReceived) {
+ return badStatusCodeReceived
+ && this.spOpcUaClient.getSpOpcConfig().getIncompleteEventStrategy()
+ .equalsIgnoreCase(SharedUserConfiguration.INCOMPLETE_OPTION_IGNORE);
+ }
+
public void onSubscriptionValue(UaMonitoredItem item,
DataValue value) {
@@ -231,14 +238,14 @@ public class OpcUaAdapter implements StreamPipesAdapter,
IPullAdapter, SupportsR
@Override
public IAdapterConfiguration declareConfig() {
- var builder = AdapterConfigurationBuilder.create(ID, 2, OpcUaAdapter::new)
+ var builder = AdapterConfigurationBuilder.create(ID, 3, OpcUaAdapter::new)
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
.withLocales(Locales.EN)
.withCategory(AdapterType.Generic, AdapterType.Manufacturing)
.requiredAlternatives(Labels.withId(ADAPTER_TYPE),
Alternatives.from(Labels.withId(PULL_MODE),
- StaticProperties.integerFreeTextProperty(
- Labels.withId(PULLING_INTERVAL))),
+ SharedUserConfiguration.getPullModeGroup()
+ ),
Alternatives.from(Labels.withId(SUBSCRIPTION_MODE)));
SharedUserConfiguration.appendSharedOpcUaConfig(builder, true);
return builder.buildConfiguration();
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaAdapterConfig.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaAdapterConfig.java
index c6736ff8c3..e2758c8c03 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaAdapterConfig.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/OpcUaAdapterConfig.java
@@ -21,6 +21,7 @@ package
org.apache.streampipes.extensions.connectors.opcua.config;
public class OpcUaAdapterConfig extends OpcUaConfig {
private Integer pullIntervalMilliSeconds;
+ private String incompleteEventStrategy;
public Integer getPullIntervalMilliSeconds() {
return pullIntervalMilliSeconds;
@@ -33,4 +34,12 @@ public class OpcUaAdapterConfig extends OpcUaConfig {
public boolean inPullMode() {
return pullIntervalMilliSeconds != null;
}
+
+ public String getIncompleteEventStrategy() {
+ return incompleteEventStrategy;
+ }
+
+ public void setIncompleteEventStrategy(String incompleteEventStrategy) {
+ this.incompleteEventStrategy = incompleteEventStrategy;
+ }
}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
index cb95919174..9b5f541071 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SharedUserConfiguration.java
@@ -18,6 +18,9 @@
package org.apache.streampipes.extensions.connectors.opcua.config;
+import org.apache.streampipes.model.staticproperty.OneOfStaticProperty;
+import org.apache.streampipes.model.staticproperty.Option;
+import org.apache.streampipes.model.staticproperty.StaticPropertyGroup;
import org.apache.streampipes.sdk.StaticProperties;
import
org.apache.streampipes.sdk.builder.AbstractConfigurablePipelineElementBuilder;
import org.apache.streampipes.sdk.helpers.Alternatives;
@@ -25,6 +28,7 @@ import org.apache.streampipes.sdk.helpers.Labels;
import java.util.List;
+import static
org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter.PULL_GROUP;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ACCESS_MODE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.ADAPTER_TYPE;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.AVAILABLE_NODES;
@@ -36,12 +40,17 @@ import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabe
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_SERVER_URL;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.OPC_URL;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PASSWORD;
+import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.PULLING_INTERVAL;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.UNAUTHENTICATED;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME;
import static
org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels.USERNAME_GROUP;
public class SharedUserConfiguration {
+ public static final String INCOMPLETE_EVENT_HANDLING_KEY =
"incomplete-event-handling";
+ public static final String INCOMPLETE_OPTION_IGNORE = "ignore-event";
+ public static final String INCOMPLETE_OPTION_SEND = "send-event";
+
public static void
appendSharedOpcUaConfig(AbstractConfigurablePipelineElementBuilder<?, ?>
builder,
boolean adapterConfig) {
@@ -81,6 +90,28 @@ public class SharedUserConfiguration {
);
}
+ public static StaticPropertyGroup getPullModeGroup() {
+ var group = StaticProperties.group(
+ Labels.withId(PULL_GROUP),
+ false,
+ StaticProperties.integerFreeTextProperty(
+ Labels.withId(PULLING_INTERVAL)),
+ getIncompleteEventConfig()
+ );
+ group.setHorizontalRendering(false);
+ return group;
+ }
+
+ public static OneOfStaticProperty getIncompleteEventConfig() {
+ return StaticProperties.singleValueSelection(
+ Labels.withId(INCOMPLETE_EVENT_HANDLING_KEY),
+ List.of(
+ new Option("Ignore (only complete messages are sent)",
INCOMPLETE_OPTION_IGNORE),
+ new Option("Send (incomplete messages are sent)",
INCOMPLETE_OPTION_SEND)
+ )
+ );
+ }
+
public static List<String> getDependsOn(boolean adapterConfig) {
return adapterConfig ? List.of(
ADAPTER_TYPE.name(),
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
index 26cae21569..6741a3e6f8 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/config/SpOpcUaConfigExtractor.java
@@ -55,8 +55,12 @@ public class SpOpcUaConfigExtractor {
if (usePullMode) {
Integer pullIntervalSeconds =
extractor.singleValueParameter(PULLING_INTERVAL.name(),
Integer.class);
+ var incompleteEventStrategy = extractor.selectedSingleValueInternalName(
+ SharedUserConfiguration.INCOMPLETE_EVENT_HANDLING_KEY, String.class
+ );
config.setPullIntervalMilliSeconds(pullIntervalSeconds);
+ config.setIncompleteEventStrategy(incompleteEventStrategy);
}
return config;
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV2.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV2.java
index 382d1e3514..23d643acbb 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV2.java
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV2.java
@@ -20,6 +20,7 @@ package
org.apache.streampipes.extensions.connectors.opcua.migration;
import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
import org.apache.streampipes.model.migration.MigrationResult;
@@ -38,7 +39,7 @@ public class OpcUaAdapterMigrationV2 implements
IAdapterMigrator {
@Override
public ModelMigratorConfig config() {
return new ModelMigratorConfig(
- "org.apache.streampipes.connect.iiot.adapters.opcua",
+ OpcUaAdapter.ID,
SpServiceTagPrefix.ADAPTER,
1,
2
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV3.java
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV3.java
new file mode 100644
index 0000000000..051dfe5196
--- /dev/null
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/java/org/apache/streampipes/extensions/connectors/opcua/migration/OpcUaAdapterMigrationV3.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.extensions.connectors.opcua.migration;
+
+import
org.apache.streampipes.extensions.api.extractor.IStaticPropertyExtractor;
+import org.apache.streampipes.extensions.api.migration.IAdapterMigrator;
+import org.apache.streampipes.extensions.connectors.opcua.adapter.OpcUaAdapter;
+import
org.apache.streampipes.extensions.connectors.opcua.config.SharedUserConfiguration;
+import org.apache.streampipes.extensions.connectors.opcua.utils.OpcUaLabels;
+import org.apache.streampipes.model.connect.adapter.AdapterDescription;
+import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix;
+import org.apache.streampipes.model.migration.MigrationResult;
+import org.apache.streampipes.model.migration.ModelMigratorConfig;
+import org.apache.streampipes.model.staticproperty.StaticPropertyAlternatives;
+import org.apache.streampipes.sdk.StaticProperties;
+import org.apache.streampipes.sdk.helpers.Labels;
+
+public class OpcUaAdapterMigrationV3 implements IAdapterMigrator {
+ @Override
+ public ModelMigratorConfig config() {
+ return new ModelMigratorConfig(
+ OpcUaAdapter.ID,
+ SpServiceTagPrefix.ADAPTER,
+ 2,
+ 3
+ );
+ }
+
+ @Override
+ public MigrationResult<AdapterDescription> migrate(AdapterDescription
element, IStaticPropertyExtractor extractor) throws RuntimeException {
+ var oneOfProperty = SharedUserConfiguration.getIncompleteEventConfig();
+ oneOfProperty.getOptions().get(0).setSelected(true);
+ element.getConfig().forEach(sp -> {
+ if
(sp.getInternalName().equalsIgnoreCase(OpcUaLabels.ADAPTER_TYPE.name())) {
+ var alternatives = (StaticPropertyAlternatives) sp;
+ var pullAlternative = alternatives.getAlternatives().get(0);
+ var pullInterval = pullAlternative.getStaticProperty();
+ var group = StaticProperties.group(
+ Labels.withId(OpcUaAdapter.PULL_GROUP),
+ false,
+ pullInterval,
+ oneOfProperty
+ );
+ group.setHorizontalRendering(false);
+ pullAlternative.setStaticProperty(
+ group
+ );
+ }
+ });
+ return MigrationResult.success(element);
+ }
+}
diff --git
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
index c874137625..995045ec7a 100644
---
a/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
+++
b/streampipes-extensions/streampipes-connectors-opcua/src/main/resources/org.apache.streampipes.connect.iiot.adapters.opcua/strings.en
@@ -67,3 +67,6 @@ PULL_MODE.description=
SUBSCRIPTION_MODE.title=Subscription mode
SUBSCRIPTION_MODE.description=
+
+incomplete-event-handling.title=Incomplete Events
+incomplete-event-handling.description=Select how events with missing values
(e.g., due to bad status codes) are handled.
diff --git
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
index b9ce761f59..54d9a0d3e2 100644
---
a/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
+++
b/streampipes-wrapper-standalone/src/main/java/org/apache/streampipes/wrapper/standalone/runtime/StandaloneEventProcessorRuntime.java
@@ -68,6 +68,9 @@ public class StandaloneEventProcessorRuntime extends
StandalonePipelineElementRu
var event = this.internalRuntimeParameters.makeEvent(runtimeParameters,
rawEvent, sourceInfo);
pipelineElement
.onEvent(event, outputCollector);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("A key could not be found - this can be due to an operation on
a missing field.");
+ addLogEntry(e);
} catch (RuntimeException e) {
LOG.error("RuntimeException while processing event in {}",
pipelineElement.getClass().getCanonicalName(), e);
addLogEntry(e);