This is an automated email from the ASF dual-hosted git repository. cgarcia pushed a commit to branch UpVer0.13.1 in repository https://gitbox.apache.org/repos/asf/plc4x-extras.git
commit 83c07444c198d62dfff42139f882e6ea56b3f638 Author: César José García León <[email protected]> AuthorDate: Thu Nov 20 10:11:31 2025 -0400 Implementing pbraw RT and pbraw IoTDB wrapper.[ --- plc4j/tools/merlot/merlot.iml | 15 + .../merlot/org.apache.plc4x.merlot.archive/pom.xml | 175 +++++++++++ .../main/cfg/org.apache.plc4x.merlot.archive.cfg | 23 ++ .../org.apache.plc4x.merlot.decanter-grafana.cfg | 45 +++ .../cfg/org.apache.plc4x.merlot.decanter-iotdb.cfg | 44 +++ .../src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg | 39 +++ .../plc4x/merlot/archive/api/BrokerService.java | 25 ++ .../plc4x/merlot/archive/api/MerlotAppender.java | 30 ++ .../plc4x/merlot/archive/api/MerlotCollector.java | 70 +++++ .../archive/api/MerlotDataBrowserSupport.java | 26 ++ .../merlot/archive/api/MerlotDecanterFactory.java | 29 ++ .../plc4x/merlot/archive/api/MerlotGPClient.java | 209 +++++++++++++ .../merlot/archive/api/MerlotIoTDBWrapper.java | 25 ++ .../archive/core/MerlotDecanterManagedService.java | 114 +++++++ .../archive/core/MerlotHtcManagedService.java | 31 ++ .../merlot/archive/impl/BrokerServiceImpl.java | 68 +++++ .../archive/impl/MerlotDataBrowserSupportImpl.java | 80 +++++ .../merlot/archive/impl/MerlotGPClientImpl.java | 276 +++++++++++++++++ .../archive/impl/MerlotMqttAppenderFactory.java | 51 ++++ .../archive/impl/MerlotMqttAppenderImpl.java | 175 +++++++++++ .../archive/impl/MerlotPvHtcCollectorImpl.java | 328 +++++++++++++++++++++ .../archive/impl/MerlotPvRtCollectorImpl.java | 296 +++++++++++++++++++ .../OSGI-INF/blueprint/decanter-service.xml | 117 ++++++++ .../nbproject/project.properties | 0 .../apache/plc4x/merlot/db/core/DBTestSuite.java | 36 +++ .../nbproject/project.properties | 0 26 files changed, 2327 insertions(+) diff --git a/plc4j/tools/merlot/merlot.iml b/plc4j/tools/merlot/merlot.iml new file mode 100644 index 0000000..452f210 --- /dev/null +++ b/plc4j/tools/merlot/merlot.iml @@ -0,0 +1,15 @@ +<?xml version="1.0" encoding="UTF-8"?> +<module version="4"> + <component name="FacetManager"> + <facet type="web" name="Web"> + <configuration> + <descriptors> + <deploymentDescriptor name="web.xml" url="file://$MODULE_DIR$/org.apache.plc4x.merlot.ui/src/main/webapp/WEB-INF/web.xml" /> + </descriptors> + <webroots> + <root url="file://$MODULE_DIR$/org.apache.plc4x.merlot.ui/src/main/webapp" relative="/" /> + </webroots> + </configuration> + </facet> + </component> +</module> \ No newline at end of file diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/pom.xml b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/pom.xml new file mode 100644 index 0000000..b3f36d5 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/pom.xml @@ -0,0 +1,175 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>merlot</artifactId> + <groupId>org.apache.plc4x</groupId> + <version>0.13.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.plc4x.merlot.archive</groupId> + <artifactId>org.apache.plc4x.merlot.archive</artifactId> + <version>0.13.0-SNAPSHOT</version> + <packaging>bundle</packaging> + + <name>PLC4J: Merlot :: archive :: Historical archive </name> + <description>decanter OSGi blueprint bundle project.</description> + + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>5.1.9</version> + <extensions>true</extensions> + <configuration> + <instructions> + <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName> + <Bundle-Version>${project.version}</Bundle-Version> + <Export-Package>org.apache.plc4x.merlot.archive*;version=${project.version}</Export-Package> + <Import-Package>org.epics.gpclient.datasource.sim, + org.epics.gpclient.datasource.pva, + com.sun.net.httpserver.*, + * + </Import-Package> + <Karaf-Commands>org.apache.plc4x.merlot.api.command*</Karaf-Commands> + <!--<_removeheaders>Import-Service,Export-Service</_removeheaders>--> + <SPI-Consumer>*</SPI-Consumer> + </instructions> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>17</source> + <target>17</target> + <maxmem>256M</maxmem> + </configuration> + </plugin> + + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>attach-artifact</goal> + </goals> + <configuration> + <artifacts> + <artifact> + <file>src/main/cfg/org.apache.plc4x.merlot.decanter-grafana.cfg</file> + <type>cfg</type> + </artifact> + <artifact> + <file>src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg</file> + <type>cfg2</type> + </artifact> + <artifact> + <file>src/main/cfg/org.apache.plc4x.merlot.archive.cfg</file> + <type>cfg3</type> + </artifact> + </artifacts> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>2.0.16</version> + </dependency> + <dependency> + <groupId>org.eclipse.paho</groupId> + <artifactId>org.eclipse.paho.client.mqttv3</artifactId> + <version>1.2.5</version> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>osgi.core</artifactId> + <version>8.0.0</version> + </dependency> + <dependency> + <groupId>org.apache.plc4x.merlot.scheduler</groupId> + <artifactId>org.apache.plc4x.merlot.scheduler</artifactId> + <version>0.13.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.osgi</groupId> + <artifactId>org.osgi.compendium</artifactId> + <version>5.0.0</version> + </dependency> + <dependency> + <groupId>org.epics</groupId> + <artifactId>gpclient</artifactId> + <version>1.0.8</version> + <type>pom</type> + </dependency> + <dependency> + <groupId>org.epics</groupId> + <artifactId>gpclient-pva</artifactId> + <version>1.0.8</version> + </dependency> + <dependency> + <groupId>org.epics</groupId> + <artifactId>gpclient-sim</artifactId> + <version>1.0.8</version> + </dependency> + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>3.17.0</version> + </dependency> + <dependency> + <groupId>org.epics</groupId> + <artifactId>epics-ntypes</artifactId> + <version>0.3.9</version> + </dependency> + <dependency> + <groupId>org.epics</groupId> + <artifactId>vtype</artifactId> + <version>1.0.7</version> + </dependency> + <dependency> + <groupId>org.epics</groupId> + <artifactId>epics-vtype-all</artifactId> + <version>1.0.7</version> + <type>pom</type> + </dependency> + <dependency> + <groupId>org.epics</groupId> + <artifactId>vtype-json</artifactId> + <version>2.9.0</version> + </dependency> + <dependency> + <groupId>org.epics</groupId> + <artifactId>gpclient-core</artifactId> + <version>1.0.8</version> + </dependency> + </dependencies> +</project> diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.archive.cfg b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.archive.cfg new file mode 100644 index 0000000..4a56335 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.archive.cfg @@ -0,0 +1,23 @@ +################################################################################ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +################################################################################ + +parametro_01 = 01 +parametro_02 = 02 +parametro_03 = 03 + diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-grafana.cfg b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-grafana.cfg new file mode 100644 index 0000000..c856a67 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-grafana.cfg @@ -0,0 +1,45 @@ +################################################################################ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +################################################################################ + +###################################### +# Decanter MQTT Appender Configuration +###################################### + +#Factory to build the EventHandler +#factory = mqtt-appender + +#Decanter event filter +#eventtopic = decanter/grafana/* + +# Location of the MQTT server +#server = tcp://localhost:1883 +#username = merlot +#password = merlot + +# MQTT client ID for the appender +#clientId = d:decanter:appender:default + +# MQTT topic where to send the collected events +#topic = decanter + +# Marshaller to use +#marshaller.target = (dataFormat=json) + +# WatchDog Timer (msec) +#watchdogtime = 10000; diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-iotdb.cfg b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-iotdb.cfg new file mode 100644 index 0000000..79ee1e8 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-iotdb.cfg @@ -0,0 +1,44 @@ +################################################################################ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +################################################################################ + +###################################### +# Decanter MQTT Appender Configuration +###################################### +#Factory to build the EventHandler +#factory = mqtt-appender + +#Decanter event filter +#eventtopic = decanter/htc/* + +# Location of the MQTT server +#server = tcp://localhost:1883 +#username = merlot +#password = merlot + +# MQTT client ID for the appender +#clientId = d:decanter:appender:default + +# MQTT topic where to send the collected events +#topic = decanter + +# Marshaller to use +#marshaller.target = (dataFormat=json) + +# WatchDog Timer (msec) +#watchdogtime = 10000; diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg new file mode 100644 index 0000000..31ba8b1 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg @@ -0,0 +1,39 @@ +################################################################################ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +################################################################################ + +################################################################################ +# /Group collection ID +# | /Scan time seconds +# | | +# G = 1 +################################################################################ +#G0001=1 + +################################################################################ +# / Historical collection ID +# | / Process value +# | | / Group scan time seconds +# | | | / Hysterisis in % +# | | | | / Delta value to register the PV +# | | | | | / Device for IoTB +# | | | | | | / tag of mesurement +# | | | | | | | +# PV = pva://UNIDAD:CELDA:NIVEL/value;G0001;5;device;measurements +################################################################################ +#PV0001=pva://UNIDAD:CELDA:NIVEL/value;G0001;5;root.system.tank;20lt01 diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/BrokerService.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/BrokerService.java new file mode 100644 index 0000000..04e239c --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/BrokerService.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.api; + +public interface BrokerService { + + public void init(); + + public void destroy(); + +} \ No newline at end of file diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotAppender.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotAppender.java new file mode 100644 index 0000000..e82ae33 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotAppender.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.api; + +import org.apache.plc4x.merlot.scheduler.api.Job; +import org.osgi.service.cm.ManagedService; +import org.osgi.service.event.EventHandler; + + +public interface MerlotAppender extends EventHandler, Job { + + public void init(); + + public void destroy(); + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotCollector.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotCollector.java new file mode 100644 index 0000000..b843e32 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotCollector.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.api; + + +public interface MerlotCollector { + + public void init(); + + + public void destroy(); + + /* + * Start the scan of all groups + */ + public void start(); + + /* + * Stop the scan of all groups + */ + public void stop(); + + /* + * Add un scan group to the collector + * @param strGroup the group name, must be unique. + * @param args[0] the scan time in seconds + */ + public void addGroup(String strGroup, String... args); + + /* + * Remove the scan group to the collector + * @param strGroup the group name, must be unique. + */ + public void removeGroup(String strGroup); + + /* + * Reschedule the group, inte second specify in scanTime + * @param strGroup the group name, must be unique. + * @param scanTime the scan time in seconds + */ + public void schedulerGroup(String strGroup, int scanTime); + + /* + * Reschedule the group, inte second specify in scanTime + * @param strGroup the group name, must be unique. + * @param args[0] the scan time in seconds + */ + public void putPvRecord(String strPvName, String... args); + + /* + * Remove the PvRecord from group. + * @param strGroup the group name, must be unique. + */ + public void removePvRecord(String strPvName); + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDataBrowserSupport.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDataBrowserSupport.java new file mode 100644 index 0000000..c2a21f5 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDataBrowserSupport.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.api; + + +public interface MerlotDataBrowserSupport { + + public void init(); + + public void destroy(); + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDecanterFactory.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDecanterFactory.java new file mode 100644 index 0000000..01e6a0f --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDecanterFactory.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.api; + +import java.util.Dictionary; +import java.util.Optional; +import org.osgi.service.event.EventHandler; + + +public interface MerlotDecanterFactory { + + Optional<MerlotAppender> createBundle(Dictionary<String, ?> props); + + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotGPClient.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotGPClient.java new file mode 100644 index 0000000..8291996 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotGPClient.java @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.api; + +import java.util.List; +import java.util.concurrent.Future; +import org.epics.gpclient.CollectorExpression; +import org.epics.gpclient.Expression; +import org.epics.gpclient.GPClientInstance; +import org.epics.gpclient.PVConfiguration; +import org.epics.gpclient.PVReaderConfiguration; +import org.epics.gpclient.ReadCollector; +import org.epics.gpclient.WriteCollector; +import org.epics.vtype.VType; + +/** +* This is an implementation of GPClient that solves the import +* of SPI services of the DataSourceProvider type during Merlot restart. +* +* TODO: Evaluate the original GPClient implementation to +* solve the service loading problem. +*/ +public interface MerlotGPClient { + + /** + * DataSources are created from the DataSourceProvider services + * available in the CLASSPATH. + */ + public void init(); + + /** + * + */ + public void destroy(); + + /** + * Reads the value of the given expression, asking for {@link VType} values. + * + * @param channelName the name of the channel + * @return the future value + */ + public Future<VType> readOnce(String channelName); + + /** + * Reads the value of the given expression. + * + * @param <R> the read type + * @param expression the expression to read + * @return the future value + */ + public <R> Future<R> readOnce(Expression<R, ?> expression); + + /** + * Reads the channel with the given name, asking for {@link VType} values. + * + * @param channelName the name of the channel + * @return the configuration options + */ + public PVReaderConfiguration<VType> read(String channelName); + + /** + * Reads the given expression. + * + * @param <R> the read type + * @param expression the expression to read + * @return the configuration options + */ + public <R> PVReaderConfiguration<R> read(Expression<R, ?> expression); + + /** + * Reads and writes the channel with the given name, asking for {@link VType} values. + * + * @param channelName the name of the channel + * @return the configuration options + */ + public PVConfiguration<VType, Object> readAndWrite(String channelName); + + /** + * Reads and writes the given expression. + * + * @param <R> the read type + * @param <W> the write type + * @param expression the expression to read and write + * @return the configuration options + */ + public <R, W> PVConfiguration<R, W> readAndWrite(Expression<R, W> expression); + + /** + * Keep only the latest value from the channel. + * <p> + * In case of data bursts (i.e. data coming in at rate faster than the + * reader can handle) this strategy will skip the notification in between, + * but always notify on the last value. + * + * @param <R> the type to read + * @param readType the type to read + * @return the caching strategy + */ + public <R> ReadCollector<R, R> cacheLastValue(Class<R> readType); + + /** + * Return all the values queued from the last update. + * <p> + * In case of data bursts (i.e. data coming in at rate faster than the + * reader can handle) this strategy will combine the notifications and + * return all the values. + * + * @param <R> the type to read + * @param readType the type to read + * @return the caching strategy + */ + public <R> ReadCollector<R, List<R>> queueAllValues(Class<R> readType); + + /** + * A write buffer for the the given type. + * + * @param <W> the type to write + * @param writeType the type to write + * @return the caching strategy + */ + public <W> WriteCollector<W> writeType(Class<W> writeType); + + /** + * A channel that reads and writes the given data types with the given strategy. + * + * @param <R> the type to read + * @param <W> the type to write + * @param channelName the name of the channel + * @param readCollector the read buffer + * @param writeCollector the write buffer + * @return a new channel expression + */ + public <R, W> Expression<R, W> channel(String channelName, ReadCollector<?, R> readCollector, WriteCollector<W> writeCollector); + + /** + * A channel that reads the given data type with the given strategy. + * + * @param <R> the type to read + * @param channelName the name of the channel + * @param readCollector the read buffer + * @return a new channel expression + */ + public <R> Expression<R, Object> channel(String channelName, ReadCollector<?, R> readCollector); + + /** + * A channel that reads {@link VType}s caching the latest value. + * + * @param channelName the name of the channel + * @return a new channel expression + */ + public Expression<VType, Object> channel(String channelName); + + /** + * An expression that allows to directly send/receive values to/from + * PVReaders/PVWriters. This can be used for testing purpose or to integrate + * data models that do not fit datasources or services. + * + * @param <R> the type to read + * @param <C> the type to collect + * @param <W> the type to write + * @param readCollector the read buffer + * @param writeCollector the write buffer + * @return a new collector expression + */ + public <R, C, W> CollectorExpression<R, C, W> collector(ReadCollector<C, R> readCollector, WriteCollector<W> writeCollector); + + /** + * An expression that allows to directly send/receive values to/from + * PVReaders/PVWriters. This can be used for testing purpose or to integrate + * data models that do not fit datasources or services. + * + * @param <R> the type to read + * @param <C> the type to collect + * @param readCollector the read buffer + * @return a new collector expression + */ + public <R, C> CollectorExpression<R, C, Object> collector(ReadCollector<C, R> readCollector); + + /** + * An expression that allows to directly send/receive values to/from + * PVReaders/PVWriters. This can be used for testing purpose or to integrate + * data models that do not fit datasources or services. + * + * @return a new collector expression + */ + public CollectorExpression<VType, VType, Object> collector(); + + /** + * The default instance of the general purpose client. + * + * @return the default instance + */ + public GPClientInstance defaultInstance(); + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotIoTDBWrapper.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotIoTDBWrapper.java new file mode 100644 index 0000000..e780304 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotIoTDBWrapper.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.api; + + +public interface MerlotIoTDBWrapper { + + public void init(); + + public void destroy(); +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotDecanterManagedService.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotDecanterManagedService.java new file mode 100644 index 0000000..3f8e253 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotDecanterManagedService.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.core; + +import java.util.Dictionary; +import java.util.Hashtable; +import java.util.Optional; +import org.apache.plc4x.merlot.archive.api.MerlotAppender; +import org.apache.plc4x.merlot.archive.api.MerlotDecanterFactory; +import org.apache.plc4x.merlot.scheduler.api.Job; +import org.apache.plc4x.merlot.scheduler.api.JobContext; +import org.apache.plc4x.merlot.scheduler.api.Scheduler; +import org.osgi.framework.BundleContext; +import org.osgi.framework.ServiceReference; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.cm.ManagedServiceFactory; +import org.osgi.service.event.EventConstants; +import org.osgi.service.event.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class MerlotDecanterManagedService implements ManagedServiceFactory, Job { + private static final Logger LOGGER = LoggerFactory.getLogger(MerlotDecanterManagedService.class); + + private final BundleContext ctx; + + public MerlotDecanterManagedService(BundleContext ctx) { + this.ctx = ctx; + } + + @Override + public void updated(String pid, Dictionary<String, ?> props) throws ConfigurationException { + String strFactory = (String) props.get("factory"); + + if (null != strFactory) { + MerlotDecanterFactory factory = getFactory(strFactory); + if (null != factory) { + Optional<MerlotAppender> optBundle = factory.createBundle(props); + if (optBundle.isPresent()) { + //Register the decanter service + optBundle.get().init(); + String strEventTopic = (String) props.get("eventtopic"); + Dictionary<String, String> properties = new Hashtable<>(); + + properties.put(EventConstants.EVENT_TOPIC, strEventTopic); + properties.put(Scheduler.PROPERTY_SCHEDULER_NAME, "MerlotAppender:" + getValue(props, "clientId")); + properties.put(Scheduler.PROPERTY_SCHEDULER_PERIOD, getValue(props, "watchdogtime")); + properties.put(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE, "true"); + properties.put(Scheduler.PROPERTY_SCHEDULER_CONCURRENT, "false"); + + ctx.registerService(new String[]{MerlotAppender.class.getName(), EventHandler.class.getName(), Job.class.getName()} , optBundle.get(), properties); + + } else { + LOGGER.info("Bundle for [" + strFactory +"] not present."); + } + + } else { + LOGGER.info("Factory service [" + strFactory +"] don't found."); + } + } else { + LOGGER.info("Factory string [" + strFactory +"] don't found in config file."); + } + } + + @Override + public void execute(JobContext context) { + //LOGGER.info("EXECUTE"); + } + + @Override + public String getName() { + LOGGER.info("GET NAME"); + return "Merlot-Decanter"; + } + + @Override + public void deleted(String pid) { + LOGGER.info("DELETE"); + } + + private String getValue(Dictionary<String, ?> props, String strKey){ + return (null == props.get(strKey))? "" : (String) props.get(strKey); + } + + private MerlotDecanterFactory getFactory(String strFactory){ + try{ + String filterdriver = "(org.plc4x.merlot.decanter.factory=" + strFactory + ")"; + ServiceReference[] refdrvs = ctx.getAllServiceReferences(MerlotDecanterFactory.class.getName(), filterdriver); + MerlotDecanterFactory refDev = (MerlotDecanterFactory) ctx.getService(refdrvs[0]); + if (refDev == null) LOGGER.info("Device [" + strFactory + "] don't found"); + return refDev; + } catch (Exception ex){ + LOGGER.error("getDevice: " + ex.toString()); + } + return null; + } + + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotHtcManagedService.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotHtcManagedService.java new file mode 100644 index 0000000..d6d00e1 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotHtcManagedService.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.core; + +import java.util.Dictionary; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.cm.ManagedService; + + +public class MerlotHtcManagedService implements ManagedService { + + @Override + public void updated(Dictionary<String, ?> properties) throws ConfigurationException { + System.out.println("CONFIGURING HTC"); + } + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/BrokerServiceImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/BrokerServiceImpl.java new file mode 100644 index 0000000..c909527 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/BrokerServiceImpl.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.plc4x.merlot.archive.impl; + + +import java.util.logging.Level; +import org.apache.plc4x.merlot.archive.api.BrokerService; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class BrokerServiceImpl implements BrokerService { + private static final Logger LOGGER = LoggerFactory.getLogger(BrokerServiceImpl.class); + + private MqttClient client; + + @Override + public void init() { + System.out.println(">>>> Init <"); + try { + client = new MqttClient("tcp://10.10.1.104:1883", "clientId", new MemoryPersistence()); + MqttConnectOptions options = new MqttConnectOptions(); + options.setCleanSession(true); + options.setUserName("merlot"); + options.setPassword("merlot".toCharArray()); + client.connect(options); + + MqttMessage message = new MqttMessage(); + + message.setPayload("esto es una prueba".getBytes()); + + for (int i=0; i<100; i++){ + client.publish("prueba", message); + } + + + } catch (MqttException ex) { + ex.printStackTrace(); + } + + } + + @Override + public void destroy() { + System.out.println("Destroy"); + } + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotDataBrowserSupportImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotDataBrowserSupportImpl.java new file mode 100644 index 0000000..514882a --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotDataBrowserSupportImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.impl; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import org.apache.plc4x.merlot.archive.api.MerlotGPClient; +import org.apache.plc4x.merlot.scheduler.api.Scheduler; +import org.osgi.service.event.EventAdmin; +import org.slf4j.LoggerFactory; + + +public class MerlotDataBrowserSupportImpl extends MerlotPvHtcCollectorImpl { + + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MerlotDataBrowserSupportImpl.class); + + private HttpServer server; + + + public MerlotDataBrowserSupportImpl(Scheduler scheduler, EventAdmin eventAdmin, MerlotGPClient gpClient) { + super(scheduler, eventAdmin, gpClient); + } + + @Override + public void start() { + super.start(); + try { + server = HttpServer.create(new InetSocketAddress(2000), 0); + // Create a context for a specific path and set the handler + server.createContext("/request/bpl/searchForPVsRegex", new MyHandler()); + server.setExecutor(null); // Use the default executor + server.start(); + System.out.println("Server is running on port 8000"); + } catch (IOException e) { + System.out.println("Error starting the server: " + e.getMessage()); + } + } + + @Override + public void stop() { + super.stop(); + server.stop(10); + } + + // Define a custom HttpHandler + static class MyHandler implements HttpHandler { + @Override + public void handle(HttpExchange exchange) throws IOException + { + // Handle the request + System.out.println("Protocol: " + exchange.getProtocol()); + System.out.println("Method : " + exchange.getRequestMethod()); + System.out.println("URI : " + exchange.getRequestURI().toString()); + String response = "uno\ndos\ntres"; + exchange.sendResponseHeaders(200, response.length()); + OutputStream os = exchange.getResponseBody(); + os.write(response.getBytes()); + os.close(); + } + } + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotGPClientImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotGPClientImpl.java new file mode 100644 index 0000000..a4acd6a --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotGPClientImpl.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.plc4x.merlot.archive.impl; + +import java.time.Duration; +import java.util.List; +import java.util.ServiceLoader; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import org.apache.plc4x.merlot.archive.api.MerlotGPClient; +import org.epics.gpclient.CollectorExpression; +import org.epics.gpclient.Expression; +import org.epics.gpclient.GPClient; +import org.epics.gpclient.GPClientConfiguration; +import org.epics.gpclient.GPClientInstance; +import org.epics.gpclient.PVConfiguration; +import org.epics.gpclient.PVReaderConfiguration; +import org.epics.gpclient.ReadCollector; +import org.epics.gpclient.WriteCollector; +import org.epics.gpclient.datasource.CompositeDataSource; +import org.epics.gpclient.datasource.DataSourceProvider; +import org.epics.vtype.VType; + +/** + * + * @author cgarcia + */ +public class MerlotGPClientImpl implements MerlotGPClient { + + private GPClientInstance gpClient; + + + @Override + public void init() { + ServiceLoader<DataSourceProvider> ldr = ServiceLoader.load(DataSourceProvider.class); + CompositeDataSource cds = new CompositeDataSource(); + for (DataSourceProvider spiObject : ldr) { + cds.putDataSource(spiObject.getName(), spiObject.createInstance()); + } + + this.gpClient = new GPClientConfiguration().defaultMaxRate(Duration.ofMillis(50)) + .notificationExecutor(org.epics.util.concurrent.Executors.localThread()) + .dataSource(cds) + .dataProcessingThreadPool(Executors.newScheduledThreadPool(Math.max(1, Runtime.getRuntime().availableProcessors() - 1), + org.epics.util.concurrent.Executors.namedPool("MerlotGPClient Worker "))).build(); + } + + @Override + public void destroy() { + gpClient.getDefaultDataSource().getChannels().clear(); + } + + /** + * Reads the value of the given expression, asking for {@link VType} values. + * + * @param channelName the name of the channel + * @return the future value + */ + @Override + public Future<VType> readOnce(String channelName) { + return gpClient.readOnce(channelName); + } + + /** + * Reads the value of the given expression. + * + * @param <R> the read type + * @param expression the expression to read + * @return the future value + */ + @Override + public <R> Future<R> readOnce(Expression<R, ?> expression) { + return gpClient.readOnce(expression); + } + + /** + * Reads the channel with the given name, asking for {@link VType} values. + * + * @param channelName the name of the channel + * @return the configuration options + */ + @Override + public PVReaderConfiguration<VType> read(String channelName) { + return gpClient.read(channelName); + } + + /** + * Reads the given expression. + * + * @param <R> the read type + * @param expression the expression to read + * @return the configuration options + */ + @Override + public <R> PVReaderConfiguration<R> read(Expression<R, ?> expression) { + return gpClient.read(expression); + } + + /** + * Reads and writes the channel with the given name, asking for {@link VType} values. + * + * @param channelName the name of the channel + * @return the configuration options + */ + @Override + public PVConfiguration<VType, Object> readAndWrite(String channelName) { + return gpClient.readAndWrite(channelName); + } + + /** + * Reads and writes the given expression. + * + * @param <R> the read type + * @param <W> the write type + * @param expression the expression to read and write + * @return the configuration options + */ + @Override + public <R, W> PVConfiguration<R, W> readAndWrite(Expression<R, W> expression) { + return gpClient.readAndWrite(expression); + } + + /** + * Keep only the latest value from the channel. + * <p> + * In case of data bursts (i.e. data coming in at rate faster than the + * reader can handle) this strategy will skip the notification in between, + * but always notify on the last value. + * + * @param <R> the type to read + * @param readType the type to read + * @return the caching strategy + */ + @Override + public <R> ReadCollector<R, R> cacheLastValue(Class<R> readType) { + return GPClient.cacheLastValue(readType); + } + + /** + * Return all the values queued from the last update. + * <p> + * In case of data bursts (i.e. data coming in at rate faster than the + * reader can handle) this strategy will combine the notifications and + * return all the values. + * + * @param <R> the type to read + * @param readType the type to read + * @return the caching strategy + */ + @Override + public <R> ReadCollector<R, List<R>> queueAllValues(Class<R> readType) { + return GPClient.queueAllValues(readType); + } + + /** + * A write buffer for the the given type. + * + * @param <W> the type to write + * @param writeType the type to write + * @return the caching strategy + */ + @Override + public <W> WriteCollector<W> writeType(Class<W> writeType) { + return GPClient.writeType(writeType); + } + + /** + * A channel that reads and writes the given data types with the given strategy. + * + * @param <R> the type to read + * @param <W> the type to write + * @param channelName the name of the channel + * @param readCollector the read buffer + * @param writeCollector the write buffer + * @return a new channel expression + */ + @Override + public <R, W> Expression<R, W> channel(String channelName, ReadCollector<?, R> readCollector, WriteCollector<W> writeCollector) { + return GPClient.channel(channelName, readCollector, writeCollector); + } + + /** + * A channel that reads the given data type with the given strategy. + * + * @param <R> the type to read + * @param channelName the name of the channel + * @param readCollector the read buffer + * @return a new channel expression + */ + @Override + public <R> Expression<R, Object> channel(String channelName, ReadCollector<?, R> readCollector) { + return GPClient.channel(channelName, readCollector); + } + + /** + * A channel that reads {@link VType}s caching the latest value. + * + * @param channelName the name of the channel + * @return a new channel expression + */ + @Override + public Expression<VType, Object> channel(String channelName) { + return GPClient.channel(channelName); + } + + /** + * An expression that allows to directly send/receive values to/from + * PVReaders/PVWriters. This can be used for testing purpose or to integrate + * data models that do not fit datasources or services. + * + * @param <R> the type to read + * @param <C> the type to collect + * @param <W> the type to write + * @param readCollector the read buffer + * @param writeCollector the write buffer + * @return a new collector expression + */ + @Override + public <R, C, W> CollectorExpression<R, C, W> collector(ReadCollector<C, R> readCollector, WriteCollector<W> writeCollector) { + return GPClient.collector(readCollector, writeCollector); + } + + /** + * An expression that allows to directly send/receive values to/from + * PVReaders/PVWriters. This can be used for testing purpose or to integrate + * data models that do not fit datasources or services. + * + * @param <R> the type to read + * @param <C> the type to collect + * @param readCollector the read buffer + * @return a new collector expression + */ + @Override + public <R, C> CollectorExpression<R, C, Object> collector(ReadCollector<C, R> readCollector) { + return GPClient.collector(readCollector); + } + + /** + * An expression that allows to directly send/receive values to/from + * PVReaders/PVWriters. This can be used for testing purpose or to integrate + * data models that do not fit datasources or services. + * + * @return a new collector expression + */ + @Override + public CollectorExpression<VType, VType, Object> collector() { + return collector(cacheLastValue(VType.class)); + } + + /** + * The default instance of the general purpose client. + * + * @return the default instance + */ + @Override + public GPClientInstance defaultInstance() { + return gpClient; + } + + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderFactory.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderFactory.java new file mode 100644 index 0000000..4dc29f0 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.impl; + +import java.util.Dictionary; +import java.util.Enumeration; +import java.util.Optional; +import org.apache.plc4x.merlot.archive.api.MerlotAppender; +import org.apache.plc4x.merlot.archive.api.MerlotDecanterFactory; + + + +public class MerlotMqttAppenderFactory implements MerlotDecanterFactory { + + @Override + public Optional<MerlotAppender> createBundle(Dictionary<String, ?> props) { + MerlotAppender appender = null; + + appender = new MerlotMqttAppenderImpl.Builder(). + ServerUri(getValue(props,"server")). + ClientId(getValue(props,"clientId")). + Topic(getValue(props,"topic")). + UserName(getValue(props,"username")). + Password(getValue(props,"password")). + EventTopic(getValue(props,"eventtopic")). + MarshallerTarget(getValue(props,"marshaller.target")). + WatchDogTime(getValue(props,"watchdogtime")).build(); + + return Optional.of(appender); + + } + + private String getValue(Dictionary<String, ?> props, String strKey){ + return (null == props.get(strKey))? "" : (String) props.get(strKey); + } + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderImpl.java new file mode 100644 index 0000000..29897a2 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderImpl.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.impl; + +import java.util.logging.Level; +import org.apache.plc4x.merlot.archive.api.MerlotAppender; +import org.apache.plc4x.merlot.scheduler.api.JobContext; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class MerlotMqttAppenderImpl implements MerlotAppender { + private static final Logger LOGGER = LoggerFactory.getLogger(MerlotMqttAppenderImpl.class); + + private final String strServerUri; + private final String strClientId; + private final String strTopic; + private final String strUserName; + private final String strPassword; + private final String strEventTopic; + private final String strMarshallerTarget; + private final String watchdogTime; + + private MqttConnectOptions options; + private MqttClient client; + private MqttMessage message; + + MerlotMqttAppenderImpl(Builder build) { + this.strServerUri = build.strServerUri; + this.strClientId = build.strClientId; + this.strTopic = build.strTopic; + this.strUserName = build.strUserName; + this.strPassword = build.strPassword; + this.strEventTopic = build.strEventTopic; + this.strMarshallerTarget = build.strMarshallerTarget; + this.watchdogTime = build.watchdogTime; + } + + + @Override + public void init() { + try { + message = new MqttMessage(); + client = new MqttClient(strServerUri, strClientId, new MemoryPersistence()); + options = new MqttConnectOptions(); + options.setCleanSession(true); + options.setUserName(strUserName); + options.setPassword(strPassword.toCharArray()); + client.connect(options); + if (client.isConnected()) { + LOGGER.info("Conected."); + } else { + LOGGER.info("Not conected."); + } + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + } + } + + @Override + public void destroy() { + if (null != client) + try { + client.close(); + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + } + } + + @Override + public void handleEvent(Event event) { + if ((null != client) & (client.isConnected())) { + final String strPayload = (String) event.getProperty("value"); + + if (null == message) { + message = new MqttMessage(); + } + + message.setPayload(strPayload.getBytes()); + + try { + client.publish((String) event.getProperty("tag"), message); + } catch (MqttException ex) { + LOGGER.error(ex.toString()); + } + } + } + + @Override + public void execute(JobContext context) { + if ((null == client) || (!client.isConnected())){ + init(); + } + } + + + public static class Builder { + private String strServerUri; + private String strClientId; + private String strTopic; + private String strUserName; + private String strPassword; + private String strEventTopic; + private String strMarshallerTarget; + private String watchdogTime; + + public Builder ServerUri(String strServerUri){ + this.strServerUri = strServerUri; + return this; + } + + public Builder ClientId(String strClientId){ + this.strClientId = strClientId; + return this; + } + + public Builder Topic(String strTopic){ + this.strTopic = strTopic; + return this; + } + + public Builder UserName(String strUserName){ + this.strUserName = strUserName; + return this; + } + + public Builder Password(String strPassword){ + this.strPassword = strPassword; + return this; + } + + public Builder EventTopic(String strEventTopic){ + this.strEventTopic = strEventTopic; + return this; + } + + public Builder WatchDogTime(String watchdogTime){ + this.watchdogTime = watchdogTime; + return this; + } + + public Builder MarshallerTarget(String strMarshallerTarget){ + this.strMarshallerTarget = strMarshallerTarget; + return this; + } + + public MerlotMqttAppenderImpl build() { + return new MerlotMqttAppenderImpl (this); + } + + + } + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvHtcCollectorImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvHtcCollectorImpl.java new file mode 100644 index 0000000..94f620f --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvHtcCollectorImpl.java @@ -0,0 +1,328 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.impl; + +import java.io.StringWriter; +import java.time.Duration; +import java.time.Instant; +import java.util.Date; +import java.util.Dictionary; +import java.util.Enumeration; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.function.BiConsumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.json.Json; +import javax.json.JsonObject; +import javax.json.JsonWriter; +import javax.json.JsonWriterFactory; +import javax.json.stream.JsonGenerator; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.plc4x.merlot.archive.api.MerlotCollector; +import org.apache.plc4x.merlot.archive.api.MerlotGPClient; +import org.apache.plc4x.merlot.scheduler.api.Job; +import org.apache.plc4x.merlot.scheduler.api.JobContext; +import org.apache.plc4x.merlot.scheduler.api.ScheduleOptions; +import org.apache.plc4x.merlot.scheduler.api.Scheduler; +import org.epics.gpclient.GPClient; +import org.epics.gpclient.GPClientConfiguration; +import org.epics.gpclient.GPClientInstance; +import org.epics.gpclient.PVEvent; +import org.epics.gpclient.PVEventRecorder; +import org.epics.gpclient.PVReader; +import org.epics.gpclient.PVReaderListener; +import org.epics.gpclient.datasource.CompositeDataSource; +import org.epics.gpclient.datasource.DataSourceProvider; +import org.epics.vtype.VNumber; +import org.epics.vtype.VType; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.cm.ManagedService; +import org.osgi.service.cm.ManagedServiceFactory; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventProperties; +import org.slf4j.LoggerFactory; + + +public class MerlotPvHtcCollectorImpl implements MerlotCollector, ManagedServiceFactory, PVReaderListener { + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MerlotPvHtcCollectorImpl.class); + private static final String HTC_ROUTE = "decanter/collector/htc"; + private static final Pattern GROUP_INDEX_PATTERN = + Pattern.compile("^HG(?<groupIndex>\\d{4})"); + private static final Pattern PV_INDEX_PATTERN = + Pattern.compile("^PV(?<groupIndex>\\d{4})"); + + protected static final String GROUP_INDEX = "groupIndex"; + + private final Scheduler scheduler; + private final EventAdmin eventAdmin; + private final MerlotGPClient gpClient; + private final Map<String, SchedulerGroup> groups = new ConcurrentHashMap<>(); + private final Map<String, MutablePair<SchedulerGroup, PVReader<VType>>> pvs = new ConcurrentHashMap<>(); + + + + public MerlotPvHtcCollectorImpl(Scheduler scheduler, EventAdmin eventAdmin, MerlotGPClient gpClient) { + this.scheduler = scheduler; + this.eventAdmin = eventAdmin; + this.gpClient = gpClient; + } + + + @Override + public void init() { +// ServiceLoader<DataSourceProvider> ldr = ServiceLoader.load(DataSourceProvider.class); +// CompositeDataSource cds = new CompositeDataSource(); +// for (DataSourceProvider spiObject : ldr) { +// cds.putDataSource(spiObject.getName(), spiObject.createInstance()); +// } +// +// cds.getDataSourceProviders().forEach((s,d) -> { System.out.println(">> " + s); }); +// +// this.gpCLient = new GPClientConfiguration().defaultMaxRate(Duration.ofMillis(50)) +// .notificationExecutor(org.epics.util.concurrent.Executors.localThread()) +// .dataSource(cds) +// .dataProcessingThreadPool(Executors.newScheduledThreadPool(Math.max(1, Runtime.getRuntime().availableProcessors() - 1), +// org.epics.util.concurrent.Executors.namedPool("PVMgr HTC Worker "))).build(); + } + + @Override + public void destroy() { + + } + + @Override + public void stop() { + groups.forEach((g, o) -> { + scheduler.unschedule(g); + }); + } + + @Override + public void start() { + groups.forEach((g, o) -> { + try { + scheduler.schedule(o, o.getScheduleOptions()); + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + } + }); + } + + @Override + public void updated(String pid, Dictionary<String, ?> properties) throws ConfigurationException { + Matcher matcher; + String strObject; + String strKey; + String strValue; + + stop(); + groups.clear(); + + if (null == properties) return; + //Group Section + Enumeration<String> enumKeys = properties.keys(); + while(enumKeys.hasMoreElements()) { + strKey = enumKeys.nextElement(); + strValue = (String) properties.get(strKey); + if ((matcher = GROUP_INDEX_PATTERN.matcher(strKey)).matches()) { + addGroup(strKey, strValue); + } + } + + //PV Section + enumKeys = properties.keys(); + while(enumKeys.hasMoreElements()) { + strKey = enumKeys.nextElement(); + strValue = (String) properties.get(strKey); + if ((matcher = PV_INDEX_PATTERN.matcher(strKey)).matches()) { + String[] fields = strValue.split(";"); + + final SchedulerGroup group = groups.get(fields[1]); + + if (null != group) { + PVInfo pvInfo = new PVInfo(); + pvInfo.strPv = fields[0]; + pvInfo.strGroup = fields[1]; + pvInfo.delta = Double.parseDouble(fields[2]); + pvInfo.strDevice= fields[3]; + pvInfo.strTag = fields[4]; + + PVEventRecorder recorder = new PVEventRecorder(); + PVReader<VType> pvr = gpClient.read(pvInfo.strPv). + addListener(recorder). + addReadListener(this). + start(); + LOGGER.info("Registered HTC Pv: " + pvInfo.strPv); + pvInfo.pvr = pvr; + pvInfo.lastValue = null; + group.addPvReader(strKey, pvInfo); + } + }; + } + } + + + @Override + public String getName() { + return "Merlot - htc"; + } + + @Override + public void deleted(String pid) { + LOGGER.info("Remove config: " + pid); + } + + @Override + public void addGroup(String strGroup, String... args) { + if (null != args) { + if (null != args[0]) { + Integer period = Integer.parseInt(args[0]) * 1000; + ScheduleOptions schOptions = scheduler.AT(Date.from(Instant.now()), -1, period); + schOptions.name(strGroup); + + SchedulerGroup group = new SchedulerGroup(eventAdmin, schOptions); + groups.put(strGroup, group); + + try { + scheduler.schedule(group, schOptions); + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + } + + } + } + } + + @Override + public void removeGroup(String strGroup) { + scheduler.unschedule(strGroup); + groups.remove(strGroup); + } + + @Override + public void schedulerGroup(String strGroup, int scanTime) { + + } + + @Override + public void putPvRecord(String strPvName, String... args) { + if (null != args){ + if (null != args[0]) { + + } + } + } + + @Override + public void removePvRecord(String strPvName) { + + } + + @Override + public void pvChanged(PVEvent event, PVReader pvReader) { + if (event.isType(PVEvent.Type.EXCEPTION)) { + LOGGER.info("EVENT: " + event.toString()); + } + + } + + private class PVInfo { + public PVReader<VType> pvr; + public VNumber lastValue; + public String strPv; + public String strGroup; + public Double delta; + public String strDevice; + public String strTag; + } + + private class SchedulerGroup implements Job { + private final EventAdmin eventAdmin; + private final ScheduleOptions schOptions; + + private final Map<String, PVInfo> pvs = new ConcurrentHashMap<>(); + private Map<String, String> properties = new Hashtable(); + private VNumber value; + + private Map<String, Boolean> config = new HashMap<String, Boolean>(); + + public SchedulerGroup(EventAdmin eventAdmin, ScheduleOptions schOptions) { + this.eventAdmin = eventAdmin; + this.schOptions = schOptions; + } + + @Override + public void execute(JobContext context) { + pvs.forEach(new BiConsumer<String, PVInfo>() { + @Override + public void accept(String s, PVInfo pv) { + + if ((pv.pvr.isConnected()) && (!pv.pvr.isPaused())) { + + value = (VNumber) pv.pvr.getValue(); + if ((null == pv.lastValue) || !value.equals(pv.lastValue)) { + + Double actualValue = value.getValue().doubleValue(); + Double lastValue = (null == pv.lastValue)? 0 : pv.lastValue.getValue().doubleValue(); + + if ((Math.abs(actualValue - lastValue)) > pv.delta) { + pv.lastValue = value; + + long timeEpoch = value.getTime().getTimestamp().toEpochMilli(); + + String strValue = String.format("{\n" + + "\"device\":\"" + pv.strDevice +"\",\n" + + "\"timestamp\":\"%d\",\n" + + "\"measurements\":[\""+ pv.strTag + "\"],\n" + + "\"values\":[\"%f\"]\n" + + "}", timeEpoch, value.getValue().doubleValue() ); + + properties.clear(); + properties.put("tag", pv.strDevice); + properties.put("value", strValue); + + EventProperties eventProps = new EventProperties(properties); + + Event decanterEvent = new Event(HTC_ROUTE, properties); + eventAdmin.postEvent(decanterEvent); + } + + } + } + } + }); + + } + + public void addPvReader(final String strPVIndex, final PVInfo pvInfo){ + pvs.put(strPVIndex, pvInfo); + } + + public ScheduleOptions getScheduleOptions(){ + return this.schOptions; + } + + } + + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvRtCollectorImpl.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvRtCollectorImpl.java new file mode 100644 index 0000000..4738933 --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvRtCollectorImpl.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.archive.impl; + +import java.time.Instant; +import java.util.Date; +import java.util.Dictionary; +import java.util.Enumeration; +import java.util.Hashtable; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiConsumer; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.apache.commons.lang3.tuple.MutablePair; +import org.apache.plc4x.merlot.archive.api.MerlotCollector; +import org.apache.plc4x.merlot.archive.api.MerlotGPClient; +import org.apache.plc4x.merlot.scheduler.api.Job; +import org.apache.plc4x.merlot.scheduler.api.JobContext; +import org.apache.plc4x.merlot.scheduler.api.ScheduleOptions; +import org.apache.plc4x.merlot.scheduler.api.Scheduler; +import org.epics.gpclient.PVEvent; +import org.epics.gpclient.PVEventRecorder; +import org.epics.gpclient.PVReader; +import org.epics.gpclient.PVReaderListener; +import org.epics.vtype.*; +import org.osgi.service.cm.ConfigurationException; +import org.osgi.service.cm.ManagedServiceFactory; +import org.osgi.service.event.Event; +import org.osgi.service.event.EventAdmin; +import org.osgi.service.event.EventProperties; +import org.slf4j.LoggerFactory; + + +public class MerlotPvRtCollectorImpl implements MerlotCollector, ManagedServiceFactory, PVReaderListener { + private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MerlotPvRtCollectorImpl.class); + private static final String HTC_ROUTE = "decanter/collector/rt"; + private static final Pattern GROUP_INDEX_PATTERN = + Pattern.compile("^RG(?<groupIndex>\\d{4})"); + private static final Pattern PV_INDEX_PATTERN = + Pattern.compile("^PV(?<groupIndex>\\d{4})"); + + protected static final String GROUP_INDEX = "groupIndex"; + + private final Scheduler scheduler; + private final EventAdmin eventAdmin; + private final MerlotGPClient gpClient; + private final Map<String, SchedulerGroup> groups = new ConcurrentHashMap<>(); + private final Map<String, MutablePair<SchedulerGroup, PVReader<VType>>> pvs = new ConcurrentHashMap<>(); + + public MerlotPvRtCollectorImpl(Scheduler scheduler, EventAdmin eventAdmin, MerlotGPClient gpClient) { + this.scheduler = scheduler; + this.eventAdmin = eventAdmin; + this.gpClient = gpClient; + } + + + @Override + public void init() { +// ServiceLoader<DataSourceProvider> ldr = ServiceLoader.load(DataSourceProvider.class); +// CompositeDataSource cds = new CompositeDataSource(); +// for (DataSourceProvider spiObject : ldr) { +// cds.putDataSource(spiObject.getName(), spiObject.createInstance()); +// } +// +// cds.getDataSourceProviders().forEach((s,d) -> { System.out.println("> " + s); }); +// +// this.gpCLient = new GPClientConfiguration().defaultMaxRate(Duration.ofMillis(50)) +// .notificationExecutor(org.epics.util.concurrent.Executors.localThread()) +// .dataSource(cds) +// .dataProcessingThreadPool(Executors.newScheduledThreadPool(Math.max(1, Runtime.getRuntime().availableProcessors() - 1), +// org.epics.util.concurrent.Executors.namedPool("PVMgr RT Worker "))).build(); + + } + + @Override + public void destroy() { + + } + + @Override + public void stop() { + groups.forEach((g, o) -> { + scheduler.unschedule(g); + }); + } + + @Override + public void start() { + groups.forEach((g, o) -> { + try { + scheduler.schedule(o, o.getScheduleOptions()); + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + } + }); + } + + @Override + public void updated(String pid, Dictionary<String, ?> properties) throws ConfigurationException { + Matcher matcher; + String strObject; + String strKey; + String strValue; + + stop(); + groups.clear(); + + if (null == properties) return; + //Group Section + Enumeration<String> enumKeys = properties.keys(); + while(enumKeys.hasMoreElements()) { + strKey = enumKeys.nextElement(); + strValue = (String) properties.get(strKey); + if ((matcher = GROUP_INDEX_PATTERN.matcher(strKey)).matches()) { + addGroup(strKey, strValue); + } + } + + //PV Section + enumKeys = properties.keys(); + while(enumKeys.hasMoreElements()) { + strKey = enumKeys.nextElement(); + strValue = (String) properties.get(strKey); + if ((matcher = PV_INDEX_PATTERN.matcher(strKey)).matches()) { + String[] fields = strValue.split(";"); + + final SchedulerGroup group = groups.get(fields[1]); + + if (null != group) { + PVInfo pvInfo = new PVInfo(); + pvInfo.strPv = fields[0]; + pvInfo.strGroup = fields[1]; + pvInfo.delta = Double.parseDouble(fields[2]); + pvInfo.strTag = fields[3]; + + PVEventRecorder recorder = new PVEventRecorder(); + PVReader<VType> pvr = gpClient.read(pvInfo.strPv). + addListener(recorder). + addReadListener(this). + start(); + LOGGER.info("Registered RT Pv: " + pvInfo.strPv); + pvInfo.pvr = pvr; + pvInfo.lastValue = null; + group.addPvReader(strKey, pvInfo); + } + }; + } + } + + @Override + public String getName() { + return "Merlot - Rt"; + } + + @Override + public void deleted(String pid) { + // + } + + @Override + public void addGroup(String strGroup, String... args) { + if (null != args) { + if (null != args[0]) { + Integer period = Integer.parseInt(args[0]) * 1000; + ScheduleOptions schOptions = scheduler.AT(Date.from(Instant.now()), -1, period); + schOptions.name(strGroup); + + SchedulerGroup group = new SchedulerGroup(eventAdmin, schOptions); + groups.put(strGroup, group); + + try { + scheduler.schedule(group, schOptions); + } catch (Exception ex) { + LOGGER.error(ex.getMessage()); + } + + } + } + } + + @Override + public void removeGroup(String strGroup) { + scheduler.unschedule(strGroup); + groups.remove(strGroup); + } + + @Override + public void schedulerGroup(String strGroup, int scanTime) { + + + } + + @Override + public void putPvRecord(String strPvName, String... args) { + if (null != args){ + if (null != args[0]) { + + } + } + } + + @Override + public void removePvRecord(String strPvName) { + + } + + @Override + public void pvChanged(PVEvent event, PVReader pvReader) { + if (event.isType(PVEvent.Type.EXCEPTION)) { + LOGGER.info("EVENT: " + event.toString()); + } + + } + + private class PVInfo { + public PVReader<VType> pvr; + public VNumber lastValue; + public String strPv; + public String strGroup; + public Double delta; + public String strTag; + } + + private class SchedulerGroup implements Job { + private final EventAdmin eventAdmin; + private final ScheduleOptions schOptions; + + private final Map<String, PVInfo> pvs = new ConcurrentHashMap<>(); + private Map<String, String> properties = new Hashtable(); + private VNumber value; + + public SchedulerGroup(EventAdmin eventAdmin, ScheduleOptions schOptions) { + this.eventAdmin = eventAdmin; + this.schOptions = schOptions; + } + + @Override + public void execute(JobContext context) { + pvs.forEach(new BiConsumer<String, PVInfo>() { + @Override + public void accept(String s, PVInfo pv) { + + if ((pv.pvr.isConnected()) && (!pv.pvr.isPaused())) { + + value = (VNumber) pv.pvr.getValue(); + if ((null == pv.lastValue) || !value.equals(pv.lastValue)) { + + Double actualValue = ((VNumber) value).getValue().doubleValue(); + Double lastValue = (null == pv.lastValue)? 0 : pv.lastValue.getValue().doubleValue(); + + if ((Math.abs(actualValue - lastValue)) > pv.delta) { + pv.lastValue = value; + properties.clear(); + properties.put("tag", pv.strTag); + properties.put("value", value.getValue().toString()); + + EventProperties eventProps = new EventProperties(properties); + + Event decanterEvent = new Event(HTC_ROUTE, properties); + eventAdmin.postEvent(decanterEvent); + } + + } + } + } + }); + + } + + public void addPvReader(final String strPVIndex, final PVInfo pvInfo){ + pvs.put(strPVIndex, pvInfo); + } + + public ScheduleOptions getScheduleOptions(){ + return this.schOptions; + } + + } + + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/resources/OSGI-INF/blueprint/decanter-service.xml b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/resources/OSGI-INF/blueprint/decanter-service.xml new file mode 100644 index 0000000..12623ac --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/resources/OSGI-INF/blueprint/decanter-service.xml @@ -0,0 +1,117 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0" default-activation="eager"> + + + <reference + id="refSchedulerService" + interface="org.apache.plc4x.merlot.scheduler.api.Scheduler" availability="mandatory" timeout="1200"/> + + <reference + id="refEventAdminService" + interface="org.osgi.service.event.EventAdmin" availability="mandatory" timeout="1200"/> + + <bean id="MerlotGPClientBean" + class="org.apache.plc4x.merlot.archive.impl.MerlotGPClientImpl" + init-method="init" + destroy-method="destroy" + scope="singleton" + activation="eager"> + </bean> + + <bean id="MerlotMqttAppenderFactoryBean" + class="org.apache.plc4x.merlot.archive.impl.MerlotMqttAppenderFactory" + scope="singleton" + activation="eager"> + </bean> + + <bean id="MerlotDecanterManagedServiceBean" + class="org.apache.plc4x.merlot.archive.core.MerlotDecanterManagedService" + scope="singleton" + activation="eager"> + <argument ref="blueprintBundleContext" /> + </bean> + + <bean id="MerlotPvHtcCollectorImplBean" + class="org.apache.plc4x.merlot.archive.impl.MerlotPvHtcCollectorImpl" + init-method="init" + scope="singleton" + activation="eager"> + <argument ref="refSchedulerService" /> + <argument ref="refEventAdminService" /> + <argument ref="MerlotGPClientBean" /> + </bean> + + <bean id="MerlotPvRtCollectorImplBean" + class="org.apache.plc4x.merlot.archive.impl.MerlotPvRtCollectorImpl" + init-method="init" + scope="singleton" + activation="eager"> + <argument ref="refSchedulerService" /> + <argument ref="refEventAdminService" /> + <argument ref="MerlotGPClientBean" /> + </bean> + + <bean id="MerlotDataBrowserSupportImplBean" + class="org.apache.plc4x.merlot.archive.impl.MerlotDataBrowserSupportImpl" + init-method="init" + scope="singleton" + activation="eager"> + <argument ref="refSchedulerService" /> + <argument ref="refEventAdminService" /> + <argument ref="MerlotGPClientBean" /> + </bean> + + <service ref="MerlotGPClientBean" auto-export="interfaces"> + </service> + + <service ref="MerlotMqttAppenderFactoryBean" auto-export="interfaces"> + <service-properties> + <entry key="org.plc4x.merlot.archive.factory" value="mqtt-appender"/> + </service-properties> + </service> + + <service ref="MerlotDecanterManagedServiceBean" auto-export="interfaces"> + <service-properties> + <entry key="service.pid" value="org.apache.plc4x.merlot.archive"/> + <entry key="scheduler.name" value="MerlotDecanterManagedServiceBean"/> + <entry key="scheduler.period" value="5000"/> + <entry key="scheduler.immediate" value="true"/> + <entry key="scheduler.concurrent" value="false"/> + </service-properties> + </service> + + <service ref="MerlotPvHtcCollectorImplBean" auto-export="interfaces"> + <service-properties> + <entry key="service.pid" value="org.apache.plc4x.merlot.pvhtc"/> + </service-properties> + </service> + + <service ref="MerlotPvRtCollectorImplBean" auto-export="interfaces"> + <service-properties> + <entry key="service.pid" value="org.apache.plc4x.merlot.pvrt"/> + </service-properties> + </service> + + <service ref="MerlotDataBrowserSupportImplBean" auto-export="interfaces"> + <service-properties> + <entry key="service.pid" value="org.apache.plc4x.merlot.archive"/> + </service-properties> + </service> + +</blueprint> \ No newline at end of file diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.db/nbproject/project.properties b/plc4j/tools/merlot/org.apache.plc4x.merlot.db/nbproject/project.properties new file mode 100644 index 0000000..e69de29 diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.db/src/test/java/org/apache/plc4x/merlot/db/core/DBTestSuite.java b/plc4j/tools/merlot/org.apache.plc4x.merlot.db/src/test/java/org/apache/plc4x/merlot/db/core/DBTestSuite.java new file mode 100644 index 0000000..227f74c --- /dev/null +++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.db/src/test/java/org/apache/plc4x/merlot/db/core/DBTestSuite.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.plc4x.merlot.db.core; + +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.runner.RunWith; +import org.junit.runners.Suite; + +/** + * + * @author cgarcia + */ +@RunWith(Suite.class) [email protected]({ + org.apache.plc4x.merlot.db.core.DBBooleanFactoryTest.class +}) +public class DBTestSuite { + +} diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.drv.s7/nbproject/project.properties b/plc4j/tools/merlot/org.apache.plc4x.merlot.drv.s7/nbproject/project.properties new file mode 100644 index 0000000..e69de29
