This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch cp-opc-client
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/cp-opc-client by this push:
new bde1028e788 fix
bde1028e788 is described below
commit bde1028e788831d8407bfc5aca6262290240504f
Author: Caideyipi <[email protected]>
AuthorDate: Thu Mar 26 16:53:25 2026 +0800
fix
---
.../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java | 1 -
.../db/pipe/sink/protocol/opcua/OpcUaSink.java | 56 ++-----
.../protocol/opcua/client/IoTDBOpcUaClient.java | 8 +-
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 163 +++++++--------------
4 files changed, 67 insertions(+), 161 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index c31627f6143..4b4c69b951d 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -29,7 +29,6 @@ import org.apache.iotdb.it.env.cluster.EnvUtils;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT1;
import org.apache.iotdb.pipe.api.exception.PipeException;
-import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.tsfile.common.conf.TSFileConfig;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
index 00f52bb5073..6d44322a3d5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/OpcUaSink.java
@@ -26,8 +26,6 @@ import
org.apache.iotdb.db.pipe.sink.protocol.opcua.client.ClientRunner;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.client.IoTDBOpcUaClient;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaNameSpace;
import org.apache.iotdb.db.pipe.sink.protocol.opcua.server.OpcUaServerBuilder;
-import org.apache.iotdb.db.storageengine.StorageEngine;
-import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -48,6 +46,8 @@ import
org.eclipse.milo.opcua.stack.core.types.builtin.StatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.File;
import java.util.Arrays;
import java.util.Map;
@@ -78,8 +78,6 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_MODEL_PUB_SUB_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_NODE_URL_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_PLACEHOLDER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_QUALITY_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_DIR_DEFAULT_VALUE;
@@ -94,6 +92,10 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CON
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_SECURITY_POLICY_SERVER_DEFAULT_VALUES;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_DEFAULT_VALUE;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_TCP_BIND_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_VALUE_NAME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_DEFAULT_VALUE;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_OPC_UA_WITH_QUALITY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_PASSWORD_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USERNAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_IOTDB_USER_KEY;
@@ -103,11 +105,12 @@ import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SIN
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_HTTPS_BIND_PORT_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_MODEL_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_NODE_URL_KEY;
-import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_PLACEHOLDER_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_QUALITY_NAME_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_DIR_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_SECURITY_POLICY_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_TCP_BIND_PORT_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_VALUE_NAME_KEY;
+import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.SINK_OPC_UA_WITH_QUALITY_KEY;
/**
* Send data in IoTDB based on Opc Ua protocol, using Eclipse Milo. All data
are converted into
@@ -208,29 +211,12 @@ public class OpcUaSink implements PipeConnector {
Arrays.asList(CONNECTOR_OPC_UA_MODEL_KEY,
SINK_OPC_UA_MODEL_KEY),
CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE);
- placeHolder4NullTag =
- parameters.getStringOrDefault(
- Arrays.asList(CONNECTOR_OPC_UA_PLACEHOLDER_KEY,
SINK_OPC_UA_PLACEHOLDER_KEY),
- CONNECTOR_OPC_UA_PLACEHOLDER_4_NULL_TAG_DEFAULT_VALUE);
- final DataRegion region =
- StorageEngine.getInstance()
- .getDataRegion(new
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
- databaseName = Objects.nonNull(region) ? region.getDatabaseName() :
"root.__temp_db";
-
- if (withQuality && PathUtils.isTableModelDatabase(databaseName)) {
- throw new PipeException(
- "When the OPC UA sink sets 'with-quality' to true, the table model
data is not supported.");
- }
final String nodeUrl =
parameters.getStringByKeys(CONNECTOR_OPC_UA_NODE_URL_KEY,
SINK_OPC_UA_NODE_URL_KEY);
if (Objects.isNull(nodeUrl)) {
customizeServer(parameters);
} else {
- if (PathUtils.isTableModelDatabase(databaseName)) {
- throw new PipeException(
- "When the OPC UA sink points to an outer server, the table model
data is not supported.");
- }
customizeClient(nodeUrl, parameters);
}
}
@@ -308,16 +294,7 @@ public class OpcUaSink implements PipeConnector {
.setEnableAnonymousAccess(enableAnonymousAccess)
.setSecurityPolicies(securityPolicies);
final OpcUaServer newServer = builder.build();
- nameSpace =
- new OpcUaNameSpace(
- newServer,
- parameters
- .getStringOrDefault(
- Arrays.asList(
- CONNECTOR_OPC_UA_MODEL_KEY,
SINK_OPC_UA_MODEL_KEY),
- CONNECTOR_OPC_UA_MODEL_DEFAULT_VALUE)
-
.equals(CONNECTOR_OPC_UA_MODEL_CLIENT_SERVER_VALUE),
- builder);
+ nameSpace = new OpcUaNameSpace(newServer, builder);
nameSpace.startup();
newServer.startup().get();
return new Pair<>(new AtomicInteger(0), nameSpace);
@@ -434,9 +411,12 @@ public class OpcUaSink implements PipeConnector {
@Override
public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws
Exception {
- transferByTablet(tabletInsertionEvent, LOGGER, tablet -> {
+ transferByTablet(
+ tabletInsertionEvent,
+ LOGGER,
+ tablet -> {
if (Objects.nonNull(nameSpace)) {
- nameSpace.transfer(tablet);
+ nameSpace.transfer(tablet, this);
} else if (Objects.nonNull(client)) {
client.transfer(tablet, this);
} else {
@@ -540,14 +520,6 @@ public class OpcUaSink implements PipeConnector {
return isClientServerModel;
}
- public String getDatabaseName() {
- return databaseName;
- }
-
- public String getPlaceHolder4NullTag() {
- return placeHolder4NullTag;
- }
-
@Nullable
public String getValueName() {
return valueName;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
index bf96d988180..94889be906e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/client/IoTDBOpcUaClient.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.write.record.Tablet;
-import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.sdk.core.AccessLevel;
@@ -99,12 +99,12 @@ public class IoTDBOpcUaClient {
// Only support tree model & client-server
public void transfer(final Tablet tablet, final OpcUaSink sink) throws
Exception {
OpcUaNameSpace.transferTabletForClientServerModel(
- tablet, false, sink, this::transferTabletRowForClientServerModel);
+ tablet, sink, this::transferTabletRowForClientServerModel);
}
private void transferTabletRowForClientServerModel(
final String[] segments,
- final List<IMeasurementSchema> measurementSchemas,
+ final List<MeasurementSchema> measurementSchemas,
final List<Long> timestamps,
final List<Object> values,
final OpcUaSink sink)
@@ -119,7 +119,7 @@ public class IoTDBOpcUaClient {
if (Objects.isNull(values.get(i))) {
continue;
}
- final String name = measurementSchemas.get(i).getMeasurementName();
+ final String name = measurementSchemas.get(i).getMeasurementId();
final TSDataType type = measurementSchemas.get(i).getType();
if (Objects.nonNull(sink.getQualityName()) &&
sink.getQualityName().equals(name)) {
if (!type.equals(TSDataType.BOOLEAN)) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index c6db63bd913..04cc251d655 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -62,6 +62,8 @@ import java.nio.file.Paths;
import java.sql.Date;
import java.time.LocalDate;
import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -70,16 +72,11 @@ import java.util.UUID;
public class OpcUaNameSpace extends ManagedNamespaceWithLifecycle {
private static final Logger LOGGER =
LoggerFactory.getLogger(OpcUaNameSpace.class);
public static final String NAMESPACE_URI = "urn:apache:iotdb:opc-server";
- private final boolean isClientServerModel;
private final SubscriptionModel subscriptionModel;
private final OpcUaServerBuilder builder;
- public OpcUaNameSpace(
- final OpcUaServer server,
- final boolean isClientServerModel,
- final OpcUaServerBuilder builder) {
+ public OpcUaNameSpace(final OpcUaServer server, final OpcUaServerBuilder
builder) {
super(server, NAMESPACE_URI);
- this.isClientServerModel = isClientServerModel;
this.builder = builder;
subscriptionModel = new SubscriptionModel(server, this);
@@ -100,81 +97,43 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
});
}
- public void transfer(final Tablet tablet, final boolean isTableModel, final
OpcUaSink sink)
- throws Exception {
+ public void transfer(final Tablet tablet, final OpcUaSink sink) throws
Exception {
if (sink.isClientServerModel()) {
- transferTabletForClientServerModel(
- tablet, isTableModel, sink,
this::transferTabletRowForClientServerModel);
+ transferTabletForClientServerModel(tablet, sink,
this::transferTabletRowForClientServerModel);
} else {
- transferTabletForPubSubModel(tablet, isTableModel, sink);
+ transferTabletForPubSubModel(tablet);
}
}
public static void transferTabletForClientServerModel(
- final Tablet tablet,
- final boolean isTableModel,
- final OpcUaSink sink,
- final TabletRowConsumer consumer)
+ final Tablet tablet, final OpcUaSink sink, final TabletRowConsumer
consumer)
throws Exception {
- final List<IMeasurementSchema> schemas = tablet.getSchemas();
- final List<IMeasurementSchema> newSchemas = new ArrayList<>();
- if (!isTableModel) {
- new
PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
-
- final List<Long> timestamps = new ArrayList<>();
- final List<Object> values = new ArrayList<>();
-
- for (int i = 0; i < schemas.size(); ++i) {
- for (int j = tablet.getRowSize() - 1; j >= 0; --j) {
- if (!tablet.isNull(j, i)) {
- newSchemas.add(schemas.get(i));
- timestamps.add(tablet.getTimestamp(j));
- values.add(
- getTabletObjectValue4Opc(tablet.getValues()[i], j,
schemas.get(i).getType()));
- break;
- }
- }
- }
+ final List<MeasurementSchema> schemas = tablet.getSchemas();
+ final List<MeasurementSchema> newSchemas = new ArrayList<>();
+ new
PipeTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
- consumer.accept(tablet.getDeviceId().split("\\."), newSchemas,
timestamps, values, sink);
- } else {
- transferTabletForPubSubModel(tablet);
- }
- }
+ final List<Long> timestamps = new ArrayList<>();
+ final List<Object> values = new ArrayList<>();
- for (int i = 0; i < tablet.getRowSize(); ++i) {
- final Object[] segments = tablet.getDeviceID(i).getSegments();
- final String[] folderSegments = new String[segments.length + 1];
- folderSegments[0] = sink.getDatabaseName();
-
- for (int j = 0; j < segments.length; ++j) {
- folderSegments[j + 1] =
- Objects.isNull(segments[j]) ? sink.getPlaceHolder4NullTag() :
(String) segments[j];
+ for (int i = 0; i < schemas.size(); ++i) {
+ for (int j = tablet.rowSize - 1; j >= 0; --j) {
+ if (tablet.bitMaps == null || tablet.bitMaps[i] == null ||
!tablet.bitMaps[i].isMarked(j)) {
+ newSchemas.add(schemas.get(i));
+ timestamps.add(tablet.timestamps[j]);
+ values.add(getTabletObjectValue4Opc(tablet.values[i], j,
schemas.get(i).getType()));
+ break;
}
-
- final int finalI = i;
- consumer.accept(
- folderSegments,
- newSchemas,
- Collections.singletonList(tablet.getTimestamp(i)),
- columnIndexes.stream()
- .map(
- index ->
- tablet.isNull(finalI, index)
- ? null
- : getTabletObjectValue4Opc(
- tablet.getValues()[index], finalI,
schemas.get(index).getType()))
- .collect(Collectors.toList()),
- sink);
}
}
+
+ consumer.accept(tablet.deviceId.split("\\."), newSchemas, timestamps,
values, sink);
}
@FunctionalInterface
public interface TabletRowConsumer {
void accept(
final String[] segments,
- final List<IMeasurementSchema> measurementSchemas,
+ final List<MeasurementSchema> measurementSchemas,
final List<Long> timestamps,
final List<Object> values,
final OpcUaSink sink)
@@ -183,7 +142,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
private void transferTabletRowForClientServerModel(
final String[] segments,
- final List<IMeasurementSchema> measurementSchemas,
+ final List<MeasurementSchema> measurementSchemas,
final List<Long> timestamps,
final List<Object> values,
final OpcUaSink sink) {
@@ -235,7 +194,8 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
() ->
new PipeRuntimeCriticalException(
String.format(
- "The folder node for %s does not exist.",
tablet.deviceId)));
+ "The folder node for %s does not exist.",
+ Arrays.toString(segments))));
}
}
@@ -250,7 +210,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
if (Objects.isNull(values.get(i))) {
continue;
}
- final String name = measurementSchemas.get(i).getMeasurementName();
+ final String name = measurementSchemas.get(i).getMeasurementId();
final TSDataType type = measurementSchemas.get(i).getType();
if (Objects.nonNull(sink.getQualityName()) &&
sink.getQualityName().equals(name)) {
if (!type.equals(TSDataType.BOOLEAN)) {
@@ -270,37 +230,25 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
Objects.isNull(sink.getValueName()) ? name :
segments[segments.length - 1];
final NodeId nodeId = newNodeId(currentFolder + nodeName);
final UaVariableNode measurementNode;
-
- int lastNonnullIndex = -1;
- for (int j = tablet.rowSize - 1; j >= 0; --j) {
- if (!tablet.bitMaps[i].isMarked(j)) {
- lastNonnullIndex = j;
- break;
- }
- }
-
- if (lastNonnullIndex == -1) {
- continue;
- }
-
- final long utcTimestamp =
timestampToUtc(tablet.timestamps[lastNonnullIndex]);
- final DataValue value =
+ final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
+ final DataValue dataValue =
new DataValue(
- new Variant(getTabletObjectValue4Opc(tablet.values[i],
lastNonnullIndex, type)),
- StatusCode.GOOD,
+ new Variant(values.get(i)),
+ currentQuality,
new DateTime(utcTimestamp),
new DateTime());
+
if (!getNodeManager().containsNode(nodeId)) {
measurementNode =
new UaVariableNode.UaVariableNodeBuilder(getNodeContext())
- .setNodeId(newNodeId(currentFolder + name))
+ .setNodeId(nodeId)
.setAccessLevel(AccessLevel.READ_WRITE)
.setUserAccessLevel(AccessLevel.READ_ONLY)
- .setBrowseName(newQualifiedName(name))
- .setDisplayName(LocalizedText.english(name))
+ .setBrowseName(newQualifiedName(nodeName))
+ .setDisplayName(LocalizedText.english(nodeName))
.setDataType(convertToOpcDataType(type))
.setTypeDefinition(Identifiers.BaseDataVariableType)
- .setValue(value)
+ .setValue(dataValue)
.build();
getNodeManager().addNode(measurementNode);
if (Objects.nonNull(folderNode)) {
@@ -324,17 +272,11 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
String.format("The Node %s does not exist.",
nodeId)));
}
- final long utcTimestamp =
timestampToUtc(timestamps.get(timestamps.size() > 1 ? i : 0));
if (Objects.isNull(sink.getValueName())) {
if (Objects.isNull(measurementNode.getValue())
- ||
Objects.requireNonNull(measurementNode.getValue().getSourceTime()).getUtcTime()
- < utcTimestamp) {
- measurementNode.setValue(
- new DataValue(
- new Variant(values.get(i)),
- currentQuality,
- new DateTime(utcTimestamp),
- new DateTime()));
+ || Objects.isNull(measurementNode.getValue().getSourceTime())
+ || measurementNode.getValue().getSourceTime().getUtcTime() <
utcTimestamp) {
+ measurementNode.setValue(dataValue);
}
} else {
valueNode = measurementNode;
@@ -342,6 +284,15 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
timestamp = utcTimestamp;
}
}
+ if (Objects.nonNull(valueNode)) {
+ if (Objects.isNull(valueNode.getValue())
+ || Objects.isNull(valueNode.getValue().getSourceTime())
+ || valueNode.getValue().getSourceTime().getUtcTime() < timestamp) {
+ valueNode.setValue(
+ new DataValue(
+ new Variant(value), currentQuality, new DateTime(timestamp),
new DateTime()));
+ }
+ }
}
private static Object getTabletObjectValue4Opc(
@@ -388,23 +339,8 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
.createEvent(
new NodeId(getNamespaceIndex(), UUID.randomUUID()),
Identifiers.BaseEventType);
- List<String> sourceNameList = null;
- if (isTableModel) {
- sourceNameList = new ArrayList<>(tablet.getRowSize());
- for (int i = 0; i < tablet.getRowSize(); ++i) {
- final StringBuilder idBuilder = new
StringBuilder(sink.getDatabaseName());
- for (final Object segment : tablet.getDeviceID(i).getSegments()) {
- idBuilder
- .append(TsFileConstant.PATH_SEPARATOR)
- .append(Objects.isNull(segment) ? sink.getPlaceHolder4NullTag()
: segment);
- }
- sourceNameList.add(idBuilder.toString());
- }
- }
-
// Use eventNode here because other nodes doesn't support values and times
simultaneously
for (int columnIndex = 0; columnIndex < tablet.getSchemas().size();
++columnIndex) {
-
final TSDataType dataType =
tablet.getSchemas().get(columnIndex).getType();
// Source name --> Sensor path, like root.test.d_0.s_0
@@ -440,8 +376,8 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
case DATE:
eventNode.setMessage(
LocalizedText.english(
- (((LocalDate[]) tablet.values[columnIndex])[rowIndex])
- .atStartOfDay(ZoneId.systemDefault())
+ ((LocalDate[]) tablet.values[columnIndex])
+ [rowIndex].atStartOfDay(ZoneId.systemDefault())
.toString()));
break;
case INT64:
@@ -506,7 +442,6 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
case STRING:
return Identifiers.String;
case VECTOR:
- case OBJECT:
case UNKNOWN:
default:
throw new PipeRuntimeNonCriticalException("Unsupported data type: " +
type);