This is an automated email from the ASF dual-hosted git repository.
cgarcia pushed a commit to branch feature/merlot
in repository https://gitbox.apache.org/repos/asf/plc4x-extras.git
The following commit(s) were added to refs/heads/feature/merlot by this push:
new 44ea19f Implementing pbraw RT and pbraw IoTDB wrapper.[ (#542)
44ea19f is described below
commit 44ea19f1af3826d1c732792a750e5c8d1cf63a01
Author: César José García León <[email protected]>
AuthorDate: Thu Nov 20 10:29:02 2025 -0400
Implementing pbraw RT and pbraw IoTDB wrapper.[ (#542)
---
.../merlot/org.apache.plc4x.merlot.archive/pom.xml | 175 +++++++++++
.../main/cfg/org.apache.plc4x.merlot.archive.cfg | 23 ++
.../org.apache.plc4x.merlot.decanter-grafana.cfg | 45 +++
.../cfg/org.apache.plc4x.merlot.decanter-iotdb.cfg | 44 +++
.../src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg | 39 +++
.../plc4x/merlot/archive/api/BrokerService.java | 25 ++
.../plc4x/merlot/archive/api/MerlotAppender.java | 30 ++
.../plc4x/merlot/archive/api/MerlotCollector.java | 70 +++++
.../archive/api/MerlotDataBrowserSupport.java | 26 ++
.../merlot/archive/api/MerlotDecanterFactory.java | 29 ++
.../plc4x/merlot/archive/api/MerlotGPClient.java | 209 +++++++++++++
.../merlot/archive/api/MerlotIoTDBWrapper.java | 25 ++
.../archive/core/MerlotDecanterManagedService.java | 114 +++++++
.../archive/core/MerlotHtcManagedService.java | 31 ++
.../merlot/archive/impl/BrokerServiceImpl.java | 68 +++++
.../archive/impl/MerlotDataBrowserSupportImpl.java | 80 +++++
.../merlot/archive/impl/MerlotGPClientImpl.java | 276 +++++++++++++++++
.../archive/impl/MerlotMqttAppenderFactory.java | 51 ++++
.../archive/impl/MerlotMqttAppenderImpl.java | 175 +++++++++++
.../archive/impl/MerlotPvHtcCollectorImpl.java | 328 +++++++++++++++++++++
.../archive/impl/MerlotPvRtCollectorImpl.java | 296 +++++++++++++++++++
.../OSGI-INF/blueprint/decanter-service.xml | 117 ++++++++
.../nbproject/project.properties | 0
.../apache/plc4x/merlot/db/core/DBTestSuite.java | 36 +++
.../nbproject/project.properties | 0
25 files changed, 2312 insertions(+)
diff --git a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/pom.xml
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/pom.xml
new file mode 100644
index 0000000..b3f36d5
--- /dev/null
+++ b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/pom.xml
@@ -0,0 +1,175 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>merlot</artifactId>
+ <groupId>org.apache.plc4x</groupId>
+ <version>0.13.0-SNAPSHOT</version>
+ </parent>
+
+ <groupId>org.apache.plc4x.merlot.archive</groupId>
+ <artifactId>org.apache.plc4x.merlot.archive</artifactId>
+ <version>0.13.0-SNAPSHOT</version>
+ <packaging>bundle</packaging>
+
+ <name>PLC4J: Merlot :: archive :: Historical archive </name>
+ <description>decanter OSGi blueprint bundle project.</description>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>5.1.9</version>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+
<Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Bundle-Version>${project.version}</Bundle-Version>
+
<Export-Package>org.apache.plc4x.merlot.archive*;version=${project.version}</Export-Package>
+ <Import-Package>org.epics.gpclient.datasource.sim,
+ org.epics.gpclient.datasource.pva,
+ com.sun.net.httpserver.*,
+ *
+ </Import-Package>
+
<Karaf-Commands>org.apache.plc4x.merlot.api.command*</Karaf-Commands>
+
<!--<_removeheaders>Import-Service,Export-Service</_removeheaders>-->
+ <SPI-Consumer>*</SPI-Consumer>
+ </instructions>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>17</source>
+ <target>17</target>
+ <maxmem>256M</maxmem>
+ </configuration>
+ </plugin>
+
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>attach-artifact</goal>
+ </goals>
+ <configuration>
+ <artifacts>
+ <artifact>
+
<file>src/main/cfg/org.apache.plc4x.merlot.decanter-grafana.cfg</file>
+ <type>cfg</type>
+ </artifact>
+ <artifact>
+
<file>src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg</file>
+ <type>cfg2</type>
+ </artifact>
+ <artifact>
+
<file>src/main/cfg/org.apache.plc4x.merlot.archive.cfg</file>
+ <type>cfg3</type>
+ </artifact>
+ </artifacts>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>2.0.16</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.paho</groupId>
+ <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+ <version>1.2.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>osgi.core</artifactId>
+ <version>8.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.plc4x.merlot.scheduler</groupId>
+ <artifactId>org.apache.plc4x.merlot.scheduler</artifactId>
+ <version>0.13.0-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ <version>5.0.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.epics</groupId>
+ <artifactId>gpclient</artifactId>
+ <version>1.0.8</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.epics</groupId>
+ <artifactId>gpclient-pva</artifactId>
+ <version>1.0.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.epics</groupId>
+ <artifactId>gpclient-sim</artifactId>
+ <version>1.0.8</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.17.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.epics</groupId>
+ <artifactId>epics-ntypes</artifactId>
+ <version>0.3.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.epics</groupId>
+ <artifactId>vtype</artifactId>
+ <version>1.0.7</version>
+ </dependency>
+ <dependency>
+ <groupId>org.epics</groupId>
+ <artifactId>epics-vtype-all</artifactId>
+ <version>1.0.7</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.epics</groupId>
+ <artifactId>vtype-json</artifactId>
+ <version>2.9.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.epics</groupId>
+ <artifactId>gpclient-core</artifactId>
+ <version>1.0.8</version>
+ </dependency>
+ </dependencies>
+</project>
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.archive.cfg
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.archive.cfg
new file mode 100644
index 0000000..4a56335
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.archive.cfg
@@ -0,0 +1,23 @@
+################################################################################
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+################################################################################
+
+parametro_01 = 01
+parametro_02 = 02
+parametro_03 = 03
+
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-grafana.cfg
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-grafana.cfg
new file mode 100644
index 0000000..c856a67
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-grafana.cfg
@@ -0,0 +1,45 @@
+################################################################################
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+################################################################################
+
+######################################
+# Decanter MQTT Appender Configuration
+######################################
+
+#Factory to build the EventHandler
+#factory = mqtt-appender
+
+#Decanter event filter
+#eventtopic = decanter/grafana/*
+
+# Location of the MQTT server
+#server = tcp://localhost:1883
+#username = merlot
+#password = merlot
+
+# MQTT client ID for the appender
+#clientId = d:decanter:appender:default
+
+# MQTT topic where to send the collected events
+#topic = decanter
+
+# Marshaller to use
+#marshaller.target = (dataFormat=json)
+
+# WatchDog Timer (msec)
+#watchdogtime = 10000;
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-iotdb.cfg
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-iotdb.cfg
new file mode 100644
index 0000000..79ee1e8
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.decanter-iotdb.cfg
@@ -0,0 +1,44 @@
+################################################################################
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+################################################################################
+
+######################################
+# Decanter MQTT Appender Configuration
+######################################
+#Factory to build the EventHandler
+#factory = mqtt-appender
+
+#Decanter event filter
+#eventtopic = decanter/htc/*
+
+# Location of the MQTT server
+#server = tcp://localhost:1883
+#username = merlot
+#password = merlot
+
+# MQTT client ID for the appender
+#clientId = d:decanter:appender:default
+
+# MQTT topic where to send the collected events
+#topic = decanter
+
+# Marshaller to use
+#marshaller.target = (dataFormat=json)
+
+# WatchDog Timer (msec)
+#watchdogtime = 10000;
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg
new file mode 100644
index 0000000..31ba8b1
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/cfg/org.apache.plc4x.merlot.pvhtc.cfg
@@ -0,0 +1,39 @@
+################################################################################
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+################################################################################
+
+################################################################################
+# /Group collection ID
+# | /Scan time seconds
+# | |
+# G = 1
+################################################################################
+#G0001=1
+
+################################################################################
+# / Historical collection ID
+# | / Process value
+# | | / Group scan time seconds
+# | | | / Hysterisis in %
+# | | | | / Delta value to register the PV
+# | | | | | / Device for IoTB
+# | | | | | | / tag of mesurement
+# | | | | | | |
+# PV = pva://UNIDAD:CELDA:NIVEL/value;G0001;5;device;measurements
+################################################################################
+#PV0001=pva://UNIDAD:CELDA:NIVEL/value;G0001;5;root.system.tank;20lt01
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/BrokerService.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/BrokerService.java
new file mode 100644
index 0000000..04e239c
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/BrokerService.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.api;
+
+public interface BrokerService {
+
+ public void init();
+
+ public void destroy();
+
+}
\ No newline at end of file
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotAppender.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotAppender.java
new file mode 100644
index 0000000..e82ae33
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotAppender.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.api;
+
+import org.apache.plc4x.merlot.scheduler.api.Job;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.service.event.EventHandler;
+
+
+public interface MerlotAppender extends EventHandler, Job {
+
+ public void init();
+
+ public void destroy();
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotCollector.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotCollector.java
new file mode 100644
index 0000000..b843e32
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotCollector.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.api;
+
+
+public interface MerlotCollector {
+
+ public void init();
+
+
+ public void destroy();
+
+ /*
+ * Start the scan of all groups
+ */
+ public void start();
+
+ /*
+ * Stop the scan of all groups
+ */
+ public void stop();
+
+ /*
+ * Add un scan group to the collector
+ * @param strGroup the group name, must be unique.
+ * @param args[0] the scan time in seconds
+ */
+ public void addGroup(String strGroup, String... args);
+
+ /*
+ * Remove the scan group to the collector
+ * @param strGroup the group name, must be unique.
+ */
+ public void removeGroup(String strGroup);
+
+ /*
+ * Reschedule the group, inte second specify in scanTime
+ * @param strGroup the group name, must be unique.
+ * @param scanTime the scan time in seconds
+ */
+ public void schedulerGroup(String strGroup, int scanTime);
+
+ /*
+ * Reschedule the group, inte second specify in scanTime
+ * @param strGroup the group name, must be unique.
+ * @param args[0] the scan time in seconds
+ */
+ public void putPvRecord(String strPvName, String... args);
+
+ /*
+ * Remove the PvRecord from group.
+ * @param strGroup the group name, must be unique.
+ */
+ public void removePvRecord(String strPvName);
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDataBrowserSupport.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDataBrowserSupport.java
new file mode 100644
index 0000000..c2a21f5
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDataBrowserSupport.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.api;
+
+
+public interface MerlotDataBrowserSupport {
+
+ public void init();
+
+ public void destroy();
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDecanterFactory.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDecanterFactory.java
new file mode 100644
index 0000000..01e6a0f
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotDecanterFactory.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.api;
+
+import java.util.Dictionary;
+import java.util.Optional;
+import org.osgi.service.event.EventHandler;
+
+
+public interface MerlotDecanterFactory {
+
+ Optional<MerlotAppender> createBundle(Dictionary<String, ?> props);
+
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotGPClient.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotGPClient.java
new file mode 100644
index 0000000..8291996
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotGPClient.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.api;
+
+import java.util.List;
+import java.util.concurrent.Future;
+import org.epics.gpclient.CollectorExpression;
+import org.epics.gpclient.Expression;
+import org.epics.gpclient.GPClientInstance;
+import org.epics.gpclient.PVConfiguration;
+import org.epics.gpclient.PVReaderConfiguration;
+import org.epics.gpclient.ReadCollector;
+import org.epics.gpclient.WriteCollector;
+import org.epics.vtype.VType;
+
+/**
+* This is an implementation of GPClient that solves the import
+* of SPI services of the DataSourceProvider type during Merlot restart.
+*
+* TODO: Evaluate the original GPClient implementation to
+* solve the service loading problem.
+*/
+public interface MerlotGPClient {
+
+ /**
+ * DataSources are created from the DataSourceProvider services
+ * available in the CLASSPATH.
+ */
+ public void init();
+
+ /**
+ *
+ */
+ public void destroy();
+
+ /**
+ * Reads the value of the given expression, asking for {@link VType}
values.
+ *
+ * @param channelName the name of the channel
+ * @return the future value
+ */
+ public Future<VType> readOnce(String channelName);
+
+ /**
+ * Reads the value of the given expression.
+ *
+ * @param <R> the read type
+ * @param expression the expression to read
+ * @return the future value
+ */
+ public <R> Future<R> readOnce(Expression<R, ?> expression);
+
+ /**
+ * Reads the channel with the given name, asking for {@link VType} values.
+ *
+ * @param channelName the name of the channel
+ * @return the configuration options
+ */
+ public PVReaderConfiguration<VType> read(String channelName);
+
+ /**
+ * Reads the given expression.
+ *
+ * @param <R> the read type
+ * @param expression the expression to read
+ * @return the configuration options
+ */
+ public <R> PVReaderConfiguration<R> read(Expression<R, ?> expression);
+
+ /**
+ * Reads and writes the channel with the given name, asking for {@link
VType} values.
+ *
+ * @param channelName the name of the channel
+ * @return the configuration options
+ */
+ public PVConfiguration<VType, Object> readAndWrite(String channelName);
+
+ /**
+ * Reads and writes the given expression.
+ *
+ * @param <R> the read type
+ * @param <W> the write type
+ * @param expression the expression to read and write
+ * @return the configuration options
+ */
+ public <R, W> PVConfiguration<R, W> readAndWrite(Expression<R, W>
expression);
+
+ /**
+ * Keep only the latest value from the channel.
+ * <p>
+ * In case of data bursts (i.e. data coming in at rate faster than the
+ * reader can handle) this strategy will skip the notification in between,
+ * but always notify on the last value.
+ *
+ * @param <R> the type to read
+ * @param readType the type to read
+ * @return the caching strategy
+ */
+ public <R> ReadCollector<R, R> cacheLastValue(Class<R> readType);
+
+ /**
+ * Return all the values queued from the last update.
+ * <p>
+ * In case of data bursts (i.e. data coming in at rate faster than the
+ * reader can handle) this strategy will combine the notifications and
+ * return all the values.
+ *
+ * @param <R> the type to read
+ * @param readType the type to read
+ * @return the caching strategy
+ */
+ public <R> ReadCollector<R, List<R>> queueAllValues(Class<R> readType);
+
+ /**
+ * A write buffer for the the given type.
+ *
+ * @param <W> the type to write
+ * @param writeType the type to write
+ * @return the caching strategy
+ */
+ public <W> WriteCollector<W> writeType(Class<W> writeType);
+
+ /**
+ * A channel that reads and writes the given data types with the given
strategy.
+ *
+ * @param <R> the type to read
+ * @param <W> the type to write
+ * @param channelName the name of the channel
+ * @param readCollector the read buffer
+ * @param writeCollector the write buffer
+ * @return a new channel expression
+ */
+ public <R, W> Expression<R, W> channel(String channelName,
ReadCollector<?, R> readCollector, WriteCollector<W> writeCollector);
+
+ /**
+ * A channel that reads the given data type with the given strategy.
+ *
+ * @param <R> the type to read
+ * @param channelName the name of the channel
+ * @param readCollector the read buffer
+ * @return a new channel expression
+ */
+ public <R> Expression<R, Object> channel(String channelName,
ReadCollector<?, R> readCollector);
+
+ /**
+ * A channel that reads {@link VType}s caching the latest value.
+ *
+ * @param channelName the name of the channel
+ * @return a new channel expression
+ */
+ public Expression<VType, Object> channel(String channelName);
+
+ /**
+ * An expression that allows to directly send/receive values to/from
+ * PVReaders/PVWriters. This can be used for testing purpose or to
integrate
+ * data models that do not fit datasources or services.
+ *
+ * @param <R> the type to read
+ * @param <C> the type to collect
+ * @param <W> the type to write
+ * @param readCollector the read buffer
+ * @param writeCollector the write buffer
+ * @return a new collector expression
+ */
+ public <R, C, W> CollectorExpression<R, C, W> collector(ReadCollector<C,
R> readCollector, WriteCollector<W> writeCollector);
+
+ /**
+ * An expression that allows to directly send/receive values to/from
+ * PVReaders/PVWriters. This can be used for testing purpose or to
integrate
+ * data models that do not fit datasources or services.
+ *
+ * @param <R> the type to read
+ * @param <C> the type to collect
+ * @param readCollector the read buffer
+ * @return a new collector expression
+ */
+ public <R, C> CollectorExpression<R, C, Object> collector(ReadCollector<C,
R> readCollector);
+
+ /**
+ * An expression that allows to directly send/receive values to/from
+ * PVReaders/PVWriters. This can be used for testing purpose or to
integrate
+ * data models that do not fit datasources or services.
+ *
+ * @return a new collector expression
+ */
+ public CollectorExpression<VType, VType, Object> collector();
+
+ /**
+ * The default instance of the general purpose client.
+ *
+ * @return the default instance
+ */
+ public GPClientInstance defaultInstance();
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotIoTDBWrapper.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotIoTDBWrapper.java
new file mode 100644
index 0000000..e780304
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/api/MerlotIoTDBWrapper.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.api;
+
+
+public interface MerlotIoTDBWrapper {
+
+ public void init();
+
+ public void destroy();
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotDecanterManagedService.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotDecanterManagedService.java
new file mode 100644
index 0000000..3f8e253
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotDecanterManagedService.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.core;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+import java.util.Optional;
+import org.apache.plc4x.merlot.archive.api.MerlotAppender;
+import org.apache.plc4x.merlot.archive.api.MerlotDecanterFactory;
+import org.apache.plc4x.merlot.scheduler.api.Job;
+import org.apache.plc4x.merlot.scheduler.api.JobContext;
+import org.apache.plc4x.merlot.scheduler.api.Scheduler;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.event.EventConstants;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MerlotDecanterManagedService implements ManagedServiceFactory,
Job {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MerlotDecanterManagedService.class);
+
+ private final BundleContext ctx;
+
+ public MerlotDecanterManagedService(BundleContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void updated(String pid, Dictionary<String, ?> props) throws
ConfigurationException {
+ String strFactory = (String) props.get("factory");
+
+ if (null != strFactory) {
+ MerlotDecanterFactory factory = getFactory(strFactory);
+ if (null != factory) {
+ Optional<MerlotAppender> optBundle =
factory.createBundle(props);
+ if (optBundle.isPresent()) {
+ //Register the decanter service
+ optBundle.get().init();
+ String strEventTopic = (String) props.get("eventtopic");
+ Dictionary<String, String> properties = new Hashtable<>();
+
+ properties.put(EventConstants.EVENT_TOPIC, strEventTopic);
+ properties.put(Scheduler.PROPERTY_SCHEDULER_NAME,
"MerlotAppender:" + getValue(props, "clientId"));
+ properties.put(Scheduler.PROPERTY_SCHEDULER_PERIOD,
getValue(props, "watchdogtime"));
+ properties.put(Scheduler.PROPERTY_SCHEDULER_IMMEDIATE,
"true");
+ properties.put(Scheduler.PROPERTY_SCHEDULER_CONCURRENT,
"false");
+
+ ctx.registerService(new
String[]{MerlotAppender.class.getName(), EventHandler.class.getName(),
Job.class.getName()} , optBundle.get(), properties);
+
+ } else {
+ LOGGER.info("Bundle for [" + strFactory +"] not
present.");
+ }
+
+ } else {
+ LOGGER.info("Factory service [" + strFactory +"] don't
found.");
+ }
+ } else {
+ LOGGER.info("Factory string [" + strFactory +"] don't found in
config file.");
+ }
+ }
+
+ @Override
+ public void execute(JobContext context) {
+ //LOGGER.info("EXECUTE");
+ }
+
+ @Override
+ public String getName() {
+ LOGGER.info("GET NAME");
+ return "Merlot-Decanter";
+ }
+
+ @Override
+ public void deleted(String pid) {
+ LOGGER.info("DELETE");
+ }
+
+ private String getValue(Dictionary<String, ?> props, String strKey){
+ return (null == props.get(strKey))? "" : (String) props.get(strKey);
+ }
+
+ private MerlotDecanterFactory getFactory(String strFactory){
+ try{
+ String filterdriver = "(org.plc4x.merlot.decanter.factory=" +
strFactory + ")";
+ ServiceReference[] refdrvs =
ctx.getAllServiceReferences(MerlotDecanterFactory.class.getName(),
filterdriver);
+ MerlotDecanterFactory refDev = (MerlotDecanterFactory)
ctx.getService(refdrvs[0]);
+ if (refDev == null) LOGGER.info("Device [" + strFactory + "] don't
found");
+ return refDev;
+ } catch (Exception ex){
+ LOGGER.error("getDevice: " + ex.toString());
+ }
+ return null;
+ }
+
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotHtcManagedService.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotHtcManagedService.java
new file mode 100644
index 0000000..d6d00e1
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/core/MerlotHtcManagedService.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.core;
+
+import java.util.Dictionary;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
+
+
+public class MerlotHtcManagedService implements ManagedService {
+
+ @Override
+ public void updated(Dictionary<String, ?> properties) throws
ConfigurationException {
+ System.out.println("CONFIGURING HTC");
+ }
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/BrokerServiceImpl.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/BrokerServiceImpl.java
new file mode 100644
index 0000000..c909527
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/BrokerServiceImpl.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.merlot.archive.impl;
+
+
+import java.util.logging.Level;
+import org.apache.plc4x.merlot.archive.api.BrokerService;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class BrokerServiceImpl implements BrokerService {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BrokerServiceImpl.class);
+
+ private MqttClient client;
+
+ @Override
+ public void init() {
+ System.out.println(">>>> Init <");
+ try {
+ client = new MqttClient("tcp://10.10.1.104:1883", "clientId", new
MemoryPersistence());
+ MqttConnectOptions options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ options.setUserName("merlot");
+ options.setPassword("merlot".toCharArray());
+ client.connect(options);
+
+ MqttMessage message = new MqttMessage();
+
+ message.setPayload("esto es una prueba".getBytes());
+
+ for (int i=0; i<100; i++){
+ client.publish("prueba", message);
+ }
+
+
+ } catch (MqttException ex) {
+ ex.printStackTrace();
+ }
+
+ }
+
+ @Override
+ public void destroy() {
+ System.out.println("Destroy");
+ }
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotDataBrowserSupportImpl.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotDataBrowserSupportImpl.java
new file mode 100644
index 0000000..514882a
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotDataBrowserSupportImpl.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.impl;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import org.apache.plc4x.merlot.archive.api.MerlotGPClient;
+import org.apache.plc4x.merlot.scheduler.api.Scheduler;
+import org.osgi.service.event.EventAdmin;
+import org.slf4j.LoggerFactory;
+
+
+public class MerlotDataBrowserSupportImpl extends MerlotPvHtcCollectorImpl {
+
+ private static final org.slf4j.Logger LOGGER =
LoggerFactory.getLogger(MerlotDataBrowserSupportImpl.class);
+
+ private HttpServer server;
+
+
+ public MerlotDataBrowserSupportImpl(Scheduler scheduler, EventAdmin
eventAdmin, MerlotGPClient gpClient) {
+ super(scheduler, eventAdmin, gpClient);
+ }
+
+ @Override
+ public void start() {
+ super.start();
+ try {
+ server = HttpServer.create(new InetSocketAddress(2000), 0);
+ // Create a context for a specific path and set the handler
+ server.createContext("/request/bpl/searchForPVsRegex", new
MyHandler());
+ server.setExecutor(null); // Use the default executor
+ server.start();
+ System.out.println("Server is running on port 8000");
+ } catch (IOException e) {
+ System.out.println("Error starting the server: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ server.stop(10);
+ }
+
+ // Define a custom HttpHandler
+ static class MyHandler implements HttpHandler {
+ @Override
+ public void handle(HttpExchange exchange) throws IOException
+ {
+ // Handle the request
+ System.out.println("Protocol: " + exchange.getProtocol());
+ System.out.println("Method : " + exchange.getRequestMethod());
+ System.out.println("URI : " +
exchange.getRequestURI().toString());
+ String response = "uno\ndos\ntres";
+ exchange.sendResponseHeaders(200, response.length());
+ OutputStream os = exchange.getResponseBody();
+ os.write(response.getBytes());
+ os.close();
+ }
+ }
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotGPClientImpl.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotGPClientImpl.java
new file mode 100644
index 0000000..a4acd6a
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotGPClientImpl.java
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.plc4x.merlot.archive.impl;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.ServiceLoader;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.plc4x.merlot.archive.api.MerlotGPClient;
+import org.epics.gpclient.CollectorExpression;
+import org.epics.gpclient.Expression;
+import org.epics.gpclient.GPClient;
+import org.epics.gpclient.GPClientConfiguration;
+import org.epics.gpclient.GPClientInstance;
+import org.epics.gpclient.PVConfiguration;
+import org.epics.gpclient.PVReaderConfiguration;
+import org.epics.gpclient.ReadCollector;
+import org.epics.gpclient.WriteCollector;
+import org.epics.gpclient.datasource.CompositeDataSource;
+import org.epics.gpclient.datasource.DataSourceProvider;
+import org.epics.vtype.VType;
+
+/**
+ *
+ * @author cgarcia
+ */
+public class MerlotGPClientImpl implements MerlotGPClient {
+
+ private GPClientInstance gpClient;
+
+
+ @Override
+ public void init() {
+ ServiceLoader<DataSourceProvider> ldr =
ServiceLoader.load(DataSourceProvider.class);
+ CompositeDataSource cds = new CompositeDataSource();
+ for (DataSourceProvider spiObject : ldr) {
+ cds.putDataSource(spiObject.getName(), spiObject.createInstance());
+ }
+
+ this.gpClient = new
GPClientConfiguration().defaultMaxRate(Duration.ofMillis(50))
+
.notificationExecutor(org.epics.util.concurrent.Executors.localThread())
+ .dataSource(cds)
+
.dataProcessingThreadPool(Executors.newScheduledThreadPool(Math.max(1,
Runtime.getRuntime().availableProcessors() - 1),
+ org.epics.util.concurrent.Executors.namedPool("MerlotGPClient
Worker "))).build();
+ }
+
+ @Override
+ public void destroy() {
+ gpClient.getDefaultDataSource().getChannels().clear();
+ }
+
+ /**
+ * Reads the value of the given expression, asking for {@link VType}
values.
+ *
+ * @param channelName the name of the channel
+ * @return the future value
+ */
+ @Override
+ public Future<VType> readOnce(String channelName) {
+ return gpClient.readOnce(channelName);
+ }
+
+ /**
+ * Reads the value of the given expression.
+ *
+ * @param <R> the read type
+ * @param expression the expression to read
+ * @return the future value
+ */
+ @Override
+ public <R> Future<R> readOnce(Expression<R, ?> expression) {
+ return gpClient.readOnce(expression);
+ }
+
+ /**
+ * Reads the channel with the given name, asking for {@link VType} values.
+ *
+ * @param channelName the name of the channel
+ * @return the configuration options
+ */
+ @Override
+ public PVReaderConfiguration<VType> read(String channelName) {
+ return gpClient.read(channelName);
+ }
+
+ /**
+ * Reads the given expression.
+ *
+ * @param <R> the read type
+ * @param expression the expression to read
+ * @return the configuration options
+ */
+ @Override
+ public <R> PVReaderConfiguration<R> read(Expression<R, ?> expression) {
+ return gpClient.read(expression);
+ }
+
+ /**
+ * Reads and writes the channel with the given name, asking for {@link
VType} values.
+ *
+ * @param channelName the name of the channel
+ * @return the configuration options
+ */
+ @Override
+ public PVConfiguration<VType, Object> readAndWrite(String channelName) {
+ return gpClient.readAndWrite(channelName);
+ }
+
+ /**
+ * Reads and writes the given expression.
+ *
+ * @param <R> the read type
+ * @param <W> the write type
+ * @param expression the expression to read and write
+ * @return the configuration options
+ */
+ @Override
+ public <R, W> PVConfiguration<R, W> readAndWrite(Expression<R, W>
expression) {
+ return gpClient.readAndWrite(expression);
+ }
+
+ /**
+ * Keep only the latest value from the channel.
+ * <p>
+ * In case of data bursts (i.e. data coming in at rate faster than the
+ * reader can handle) this strategy will skip the notification in between,
+ * but always notify on the last value.
+ *
+ * @param <R> the type to read
+ * @param readType the type to read
+ * @return the caching strategy
+ */
+ @Override
+ public <R> ReadCollector<R, R> cacheLastValue(Class<R> readType) {
+ return GPClient.cacheLastValue(readType);
+ }
+
+ /**
+ * Return all the values queued from the last update.
+ * <p>
+ * In case of data bursts (i.e. data coming in at rate faster than the
+ * reader can handle) this strategy will combine the notifications and
+ * return all the values.
+ *
+ * @param <R> the type to read
+ * @param readType the type to read
+ * @return the caching strategy
+ */
+ @Override
+ public <R> ReadCollector<R, List<R>> queueAllValues(Class<R> readType) {
+ return GPClient.queueAllValues(readType);
+ }
+
+ /**
+ * A write buffer for the the given type.
+ *
+ * @param <W> the type to write
+ * @param writeType the type to write
+ * @return the caching strategy
+ */
+ @Override
+ public <W> WriteCollector<W> writeType(Class<W> writeType) {
+ return GPClient.writeType(writeType);
+ }
+
+ /**
+ * A channel that reads and writes the given data types with the given
strategy.
+ *
+ * @param <R> the type to read
+ * @param <W> the type to write
+ * @param channelName the name of the channel
+ * @param readCollector the read buffer
+ * @param writeCollector the write buffer
+ * @return a new channel expression
+ */
+ @Override
+ public <R, W> Expression<R, W> channel(String channelName,
ReadCollector<?, R> readCollector, WriteCollector<W> writeCollector) {
+ return GPClient.channel(channelName, readCollector, writeCollector);
+ }
+
+ /**
+ * A channel that reads the given data type with the given strategy.
+ *
+ * @param <R> the type to read
+ * @param channelName the name of the channel
+ * @param readCollector the read buffer
+ * @return a new channel expression
+ */
+ @Override
+ public <R> Expression<R, Object> channel(String channelName,
ReadCollector<?, R> readCollector) {
+ return GPClient.channel(channelName, readCollector);
+ }
+
+ /**
+ * A channel that reads {@link VType}s caching the latest value.
+ *
+ * @param channelName the name of the channel
+ * @return a new channel expression
+ */
+ @Override
+ public Expression<VType, Object> channel(String channelName) {
+ return GPClient.channel(channelName);
+ }
+
+ /**
+ * An expression that allows to directly send/receive values to/from
+ * PVReaders/PVWriters. This can be used for testing purpose or to
integrate
+ * data models that do not fit datasources or services.
+ *
+ * @param <R> the type to read
+ * @param <C> the type to collect
+ * @param <W> the type to write
+ * @param readCollector the read buffer
+ * @param writeCollector the write buffer
+ * @return a new collector expression
+ */
+ @Override
+ public <R, C, W> CollectorExpression<R, C, W> collector(ReadCollector<C,
R> readCollector, WriteCollector<W> writeCollector) {
+ return GPClient.collector(readCollector, writeCollector);
+ }
+
+ /**
+ * An expression that allows to directly send/receive values to/from
+ * PVReaders/PVWriters. This can be used for testing purpose or to
integrate
+ * data models that do not fit datasources or services.
+ *
+ * @param <R> the type to read
+ * @param <C> the type to collect
+ * @param readCollector the read buffer
+ * @return a new collector expression
+ */
+ @Override
+ public <R, C> CollectorExpression<R, C, Object> collector(ReadCollector<C,
R> readCollector) {
+ return GPClient.collector(readCollector);
+ }
+
+ /**
+ * An expression that allows to directly send/receive values to/from
+ * PVReaders/PVWriters. This can be used for testing purpose or to
integrate
+ * data models that do not fit datasources or services.
+ *
+ * @return a new collector expression
+ */
+ @Override
+ public CollectorExpression<VType, VType, Object> collector() {
+ return collector(cacheLastValue(VType.class));
+ }
+
+ /**
+ * The default instance of the general purpose client.
+ *
+ * @return the default instance
+ */
+ @Override
+ public GPClientInstance defaultInstance() {
+ return gpClient;
+ }
+
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderFactory.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderFactory.java
new file mode 100644
index 0000000..4dc29f0
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.impl;
+
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.Optional;
+import org.apache.plc4x.merlot.archive.api.MerlotAppender;
+import org.apache.plc4x.merlot.archive.api.MerlotDecanterFactory;
+
+
+
+public class MerlotMqttAppenderFactory implements MerlotDecanterFactory {
+
+ @Override
+ public Optional<MerlotAppender> createBundle(Dictionary<String, ?> props) {
+ MerlotAppender appender = null;
+
+ appender = new MerlotMqttAppenderImpl.Builder().
+ ServerUri(getValue(props,"server")).
+ ClientId(getValue(props,"clientId")).
+ Topic(getValue(props,"topic")).
+ UserName(getValue(props,"username")).
+ Password(getValue(props,"password")).
+ EventTopic(getValue(props,"eventtopic")).
+ MarshallerTarget(getValue(props,"marshaller.target")).
+ WatchDogTime(getValue(props,"watchdogtime")).build();
+
+ return Optional.of(appender);
+
+ }
+
+ private String getValue(Dictionary<String, ?> props, String strKey){
+ return (null == props.get(strKey))? "" : (String) props.get(strKey);
+ }
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderImpl.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderImpl.java
new file mode 100644
index 0000000..29897a2
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotMqttAppenderImpl.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.impl;
+
+import java.util.logging.Level;
+import org.apache.plc4x.merlot.archive.api.MerlotAppender;
+import org.apache.plc4x.merlot.scheduler.api.JobContext;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class MerlotMqttAppenderImpl implements MerlotAppender {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MerlotMqttAppenderImpl.class);
+
+ private final String strServerUri;
+ private final String strClientId;
+ private final String strTopic;
+ private final String strUserName;
+ private final String strPassword;
+ private final String strEventTopic;
+ private final String strMarshallerTarget;
+ private final String watchdogTime;
+
+ private MqttConnectOptions options;
+ private MqttClient client;
+ private MqttMessage message;
+
+ MerlotMqttAppenderImpl(Builder build) {
+ this.strServerUri = build.strServerUri;
+ this.strClientId = build.strClientId;
+ this.strTopic = build.strTopic;
+ this.strUserName = build.strUserName;
+ this.strPassword = build.strPassword;
+ this.strEventTopic = build.strEventTopic;
+ this.strMarshallerTarget = build.strMarshallerTarget;
+ this.watchdogTime = build.watchdogTime;
+ }
+
+
+ @Override
+ public void init() {
+ try {
+ message = new MqttMessage();
+ client = new MqttClient(strServerUri, strClientId, new
MemoryPersistence());
+ options = new MqttConnectOptions();
+ options.setCleanSession(true);
+ options.setUserName(strUserName);
+ options.setPassword(strPassword.toCharArray());
+ client.connect(options);
+ if (client.isConnected()) {
+ LOGGER.info("Conected.");
+ } else {
+ LOGGER.info("Not conected.");
+ }
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ }
+ }
+
+ @Override
+ public void destroy() {
+ if (null != client)
+ try {
+ client.close();
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ }
+ }
+
+ @Override
+ public void handleEvent(Event event) {
+ if ((null != client) & (client.isConnected())) {
+ final String strPayload = (String) event.getProperty("value");
+
+ if (null == message) {
+ message = new MqttMessage();
+ }
+
+ message.setPayload(strPayload.getBytes());
+
+ try {
+ client.publish((String) event.getProperty("tag"), message);
+ } catch (MqttException ex) {
+ LOGGER.error(ex.toString());
+ }
+ }
+ }
+
+ @Override
+ public void execute(JobContext context) {
+ if ((null == client) || (!client.isConnected())){
+ init();
+ }
+ }
+
+
+ public static class Builder {
+ private String strServerUri;
+ private String strClientId;
+ private String strTopic;
+ private String strUserName;
+ private String strPassword;
+ private String strEventTopic;
+ private String strMarshallerTarget;
+ private String watchdogTime;
+
+ public Builder ServerUri(String strServerUri){
+ this.strServerUri = strServerUri;
+ return this;
+ }
+
+ public Builder ClientId(String strClientId){
+ this.strClientId = strClientId;
+ return this;
+ }
+
+ public Builder Topic(String strTopic){
+ this.strTopic = strTopic;
+ return this;
+ }
+
+ public Builder UserName(String strUserName){
+ this.strUserName = strUserName;
+ return this;
+ }
+
+ public Builder Password(String strPassword){
+ this.strPassword = strPassword;
+ return this;
+ }
+
+ public Builder EventTopic(String strEventTopic){
+ this.strEventTopic = strEventTopic;
+ return this;
+ }
+
+ public Builder WatchDogTime(String watchdogTime){
+ this.watchdogTime = watchdogTime;
+ return this;
+ }
+
+ public Builder MarshallerTarget(String strMarshallerTarget){
+ this.strMarshallerTarget = strMarshallerTarget;
+ return this;
+ }
+
+ public MerlotMqttAppenderImpl build() {
+ return new MerlotMqttAppenderImpl (this);
+ }
+
+
+ }
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvHtcCollectorImpl.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvHtcCollectorImpl.java
new file mode 100644
index 0000000..94f620f
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvHtcCollectorImpl.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.impl;
+
+import java.io.StringWriter;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.function.BiConsumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.json.Json;
+import javax.json.JsonObject;
+import javax.json.JsonWriter;
+import javax.json.JsonWriterFactory;
+import javax.json.stream.JsonGenerator;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.plc4x.merlot.archive.api.MerlotCollector;
+import org.apache.plc4x.merlot.archive.api.MerlotGPClient;
+import org.apache.plc4x.merlot.scheduler.api.Job;
+import org.apache.plc4x.merlot.scheduler.api.JobContext;
+import org.apache.plc4x.merlot.scheduler.api.ScheduleOptions;
+import org.apache.plc4x.merlot.scheduler.api.Scheduler;
+import org.epics.gpclient.GPClient;
+import org.epics.gpclient.GPClientConfiguration;
+import org.epics.gpclient.GPClientInstance;
+import org.epics.gpclient.PVEvent;
+import org.epics.gpclient.PVEventRecorder;
+import org.epics.gpclient.PVReader;
+import org.epics.gpclient.PVReaderListener;
+import org.epics.gpclient.datasource.CompositeDataSource;
+import org.epics.gpclient.datasource.DataSourceProvider;
+import org.epics.vtype.VNumber;
+import org.epics.vtype.VType;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedService;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventProperties;
+import org.slf4j.LoggerFactory;
+
+
+public class MerlotPvHtcCollectorImpl implements MerlotCollector,
ManagedServiceFactory, PVReaderListener {
+ private static final org.slf4j.Logger LOGGER =
LoggerFactory.getLogger(MerlotPvHtcCollectorImpl.class);
+ private static final String HTC_ROUTE = "decanter/collector/htc";
+ private static final Pattern GROUP_INDEX_PATTERN =
+ Pattern.compile("^HG(?<groupIndex>\\d{4})");
+ private static final Pattern PV_INDEX_PATTERN =
+ Pattern.compile("^PV(?<groupIndex>\\d{4})");
+
+ protected static final String GROUP_INDEX = "groupIndex";
+
+ private final Scheduler scheduler;
+ private final EventAdmin eventAdmin;
+ private final MerlotGPClient gpClient;
+ private final Map<String, SchedulerGroup> groups = new
ConcurrentHashMap<>();
+ private final Map<String, MutablePair<SchedulerGroup, PVReader<VType>>>
pvs = new ConcurrentHashMap<>();
+
+
+
+ public MerlotPvHtcCollectorImpl(Scheduler scheduler, EventAdmin
eventAdmin, MerlotGPClient gpClient) {
+ this.scheduler = scheduler;
+ this.eventAdmin = eventAdmin;
+ this.gpClient = gpClient;
+ }
+
+
+ @Override
+ public void init() {
+// ServiceLoader<DataSourceProvider> ldr =
ServiceLoader.load(DataSourceProvider.class);
+// CompositeDataSource cds = new CompositeDataSource();
+// for (DataSourceProvider spiObject : ldr) {
+// cds.putDataSource(spiObject.getName(),
spiObject.createInstance());
+// }
+//
+// cds.getDataSourceProviders().forEach((s,d) -> {
System.out.println(">> " + s); });
+//
+// this.gpCLient = new
GPClientConfiguration().defaultMaxRate(Duration.ofMillis(50))
+//
.notificationExecutor(org.epics.util.concurrent.Executors.localThread())
+// .dataSource(cds)
+//
.dataProcessingThreadPool(Executors.newScheduledThreadPool(Math.max(1,
Runtime.getRuntime().availableProcessors() - 1),
+// org.epics.util.concurrent.Executors.namedPool("PVMgr HTC
Worker "))).build();
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public void stop() {
+ groups.forEach((g, o) -> {
+ scheduler.unschedule(g);
+ });
+ }
+
+ @Override
+ public void start() {
+ groups.forEach((g, o) -> {
+ try {
+ scheduler.schedule(o, o.getScheduleOptions());
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ }
+ });
+ }
+
+ @Override
+ public void updated(String pid, Dictionary<String, ?> properties) throws
ConfigurationException {
+ Matcher matcher;
+ String strObject;
+ String strKey;
+ String strValue;
+
+ stop();
+ groups.clear();
+
+ if (null == properties) return;
+ //Group Section
+ Enumeration<String> enumKeys = properties.keys();
+ while(enumKeys.hasMoreElements()) {
+ strKey = enumKeys.nextElement();
+ strValue = (String) properties.get(strKey);
+ if ((matcher = GROUP_INDEX_PATTERN.matcher(strKey)).matches()) {
+ addGroup(strKey, strValue);
+ }
+ }
+
+ //PV Section
+ enumKeys = properties.keys();
+ while(enumKeys.hasMoreElements()) {
+ strKey = enumKeys.nextElement();
+ strValue = (String) properties.get(strKey);
+ if ((matcher = PV_INDEX_PATTERN.matcher(strKey)).matches()) {
+ String[] fields = strValue.split(";");
+
+ final SchedulerGroup group = groups.get(fields[1]);
+
+ if (null != group) {
+ PVInfo pvInfo = new PVInfo();
+ pvInfo.strPv = fields[0];
+ pvInfo.strGroup = fields[1];
+ pvInfo.delta = Double.parseDouble(fields[2]);
+ pvInfo.strDevice= fields[3];
+ pvInfo.strTag = fields[4];
+
+ PVEventRecorder recorder = new PVEventRecorder();
+ PVReader<VType> pvr = gpClient.read(pvInfo.strPv).
+ addListener(recorder).
+ addReadListener(this).
+ start();
+ LOGGER.info("Registered HTC Pv: " + pvInfo.strPv);
+ pvInfo.pvr = pvr;
+ pvInfo.lastValue = null;
+ group.addPvReader(strKey, pvInfo);
+ }
+ };
+ }
+ }
+
+
+ @Override
+ public String getName() {
+ return "Merlot - htc";
+ }
+
+ @Override
+ public void deleted(String pid) {
+ LOGGER.info("Remove config: " + pid);
+ }
+
+ @Override
+ public void addGroup(String strGroup, String... args) {
+ if (null != args) {
+ if (null != args[0]) {
+ Integer period = Integer.parseInt(args[0]) * 1000;
+ ScheduleOptions schOptions =
scheduler.AT(Date.from(Instant.now()), -1, period);
+ schOptions.name(strGroup);
+
+ SchedulerGroup group = new SchedulerGroup(eventAdmin,
schOptions);
+ groups.put(strGroup, group);
+
+ try {
+ scheduler.schedule(group, schOptions);
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public void removeGroup(String strGroup) {
+ scheduler.unschedule(strGroup);
+ groups.remove(strGroup);
+ }
+
+ @Override
+ public void schedulerGroup(String strGroup, int scanTime) {
+
+ }
+
+ @Override
+ public void putPvRecord(String strPvName, String... args) {
+ if (null != args){
+ if (null != args[0]) {
+
+ }
+ }
+ }
+
+ @Override
+ public void removePvRecord(String strPvName) {
+
+ }
+
+ @Override
+ public void pvChanged(PVEvent event, PVReader pvReader) {
+ if (event.isType(PVEvent.Type.EXCEPTION)) {
+ LOGGER.info("EVENT: " + event.toString());
+ }
+
+ }
+
+ private class PVInfo {
+ public PVReader<VType> pvr;
+ public VNumber lastValue;
+ public String strPv;
+ public String strGroup;
+ public Double delta;
+ public String strDevice;
+ public String strTag;
+ }
+
+ private class SchedulerGroup implements Job {
+ private final EventAdmin eventAdmin;
+ private final ScheduleOptions schOptions;
+
+ private final Map<String, PVInfo> pvs = new ConcurrentHashMap<>();
+ private Map<String, String> properties = new Hashtable();
+ private VNumber value;
+
+ private Map<String, Boolean> config = new HashMap<String, Boolean>();
+
+ public SchedulerGroup(EventAdmin eventAdmin, ScheduleOptions
schOptions) {
+ this.eventAdmin = eventAdmin;
+ this.schOptions = schOptions;
+ }
+
+ @Override
+ public void execute(JobContext context) {
+ pvs.forEach(new BiConsumer<String, PVInfo>() {
+ @Override
+ public void accept(String s, PVInfo pv) {
+
+ if ((pv.pvr.isConnected()) && (!pv.pvr.isPaused())) {
+
+ value = (VNumber) pv.pvr.getValue();
+ if ((null == pv.lastValue) ||
!value.equals(pv.lastValue)) {
+
+ Double actualValue =
value.getValue().doubleValue();
+ Double lastValue = (null == pv.lastValue)? 0 :
pv.lastValue.getValue().doubleValue();
+
+ if ((Math.abs(actualValue - lastValue)) >
pv.delta) {
+ pv.lastValue = value;
+
+ long timeEpoch =
value.getTime().getTimestamp().toEpochMilli();
+
+ String strValue = String.format("{\n" +
+ "\"device\":\"" + pv.strDevice
+"\",\n" +
+ "\"timestamp\":\"%d\",\n" +
+ "\"measurements\":[\""+ pv.strTag +
"\"],\n" +
+ "\"values\":[\"%f\"]\n" +
+ "}", timeEpoch,
value.getValue().doubleValue() );
+
+ properties.clear();
+ properties.put("tag", pv.strDevice);
+ properties.put("value", strValue);
+
+ EventProperties eventProps = new
EventProperties(properties);
+
+ Event decanterEvent = new Event(HTC_ROUTE,
properties);
+ eventAdmin.postEvent(decanterEvent);
+ }
+
+ }
+ }
+ }
+ });
+
+ }
+
+ public void addPvReader(final String strPVIndex, final PVInfo pvInfo){
+ pvs.put(strPVIndex, pvInfo);
+ }
+
+ public ScheduleOptions getScheduleOptions(){
+ return this.schOptions;
+ }
+
+ }
+
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvRtCollectorImpl.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvRtCollectorImpl.java
new file mode 100644
index 0000000..4738933
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/java/org/apache/plc4x/merlot/archive/impl/MerlotPvRtCollectorImpl.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.archive.impl;
+
+import java.time.Instant;
+import java.util.Date;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiConsumer;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.plc4x.merlot.archive.api.MerlotCollector;
+import org.apache.plc4x.merlot.archive.api.MerlotGPClient;
+import org.apache.plc4x.merlot.scheduler.api.Job;
+import org.apache.plc4x.merlot.scheduler.api.JobContext;
+import org.apache.plc4x.merlot.scheduler.api.ScheduleOptions;
+import org.apache.plc4x.merlot.scheduler.api.Scheduler;
+import org.epics.gpclient.PVEvent;
+import org.epics.gpclient.PVEventRecorder;
+import org.epics.gpclient.PVReader;
+import org.epics.gpclient.PVReaderListener;
+import org.epics.vtype.*;
+import org.osgi.service.cm.ConfigurationException;
+import org.osgi.service.cm.ManagedServiceFactory;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventAdmin;
+import org.osgi.service.event.EventProperties;
+import org.slf4j.LoggerFactory;
+
+
+public class MerlotPvRtCollectorImpl implements MerlotCollector,
ManagedServiceFactory, PVReaderListener {
+ private static final org.slf4j.Logger LOGGER =
LoggerFactory.getLogger(MerlotPvRtCollectorImpl.class);
+ private static final String HTC_ROUTE = "decanter/collector/rt";
+ private static final Pattern GROUP_INDEX_PATTERN =
+ Pattern.compile("^RG(?<groupIndex>\\d{4})");
+ private static final Pattern PV_INDEX_PATTERN =
+ Pattern.compile("^PV(?<groupIndex>\\d{4})");
+
+ protected static final String GROUP_INDEX = "groupIndex";
+
+ private final Scheduler scheduler;
+ private final EventAdmin eventAdmin;
+ private final MerlotGPClient gpClient;
+ private final Map<String, SchedulerGroup> groups = new
ConcurrentHashMap<>();
+ private final Map<String, MutablePair<SchedulerGroup, PVReader<VType>>>
pvs = new ConcurrentHashMap<>();
+
+ public MerlotPvRtCollectorImpl(Scheduler scheduler, EventAdmin eventAdmin,
MerlotGPClient gpClient) {
+ this.scheduler = scheduler;
+ this.eventAdmin = eventAdmin;
+ this.gpClient = gpClient;
+ }
+
+
+ @Override
+ public void init() {
+// ServiceLoader<DataSourceProvider> ldr =
ServiceLoader.load(DataSourceProvider.class);
+// CompositeDataSource cds = new CompositeDataSource();
+// for (DataSourceProvider spiObject : ldr) {
+// cds.putDataSource(spiObject.getName(),
spiObject.createInstance());
+// }
+//
+// cds.getDataSourceProviders().forEach((s,d) -> {
System.out.println("> " + s); });
+//
+// this.gpCLient = new
GPClientConfiguration().defaultMaxRate(Duration.ofMillis(50))
+//
.notificationExecutor(org.epics.util.concurrent.Executors.localThread())
+// .dataSource(cds)
+//
.dataProcessingThreadPool(Executors.newScheduledThreadPool(Math.max(1,
Runtime.getRuntime().availableProcessors() - 1),
+// org.epics.util.concurrent.Executors.namedPool("PVMgr RT
Worker "))).build();
+
+ }
+
+ @Override
+ public void destroy() {
+
+ }
+
+ @Override
+ public void stop() {
+ groups.forEach((g, o) -> {
+ scheduler.unschedule(g);
+ });
+ }
+
+ @Override
+ public void start() {
+ groups.forEach((g, o) -> {
+ try {
+ scheduler.schedule(o, o.getScheduleOptions());
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ }
+ });
+ }
+
+ @Override
+ public void updated(String pid, Dictionary<String, ?> properties) throws
ConfigurationException {
+ Matcher matcher;
+ String strObject;
+ String strKey;
+ String strValue;
+
+ stop();
+ groups.clear();
+
+ if (null == properties) return;
+ //Group Section
+ Enumeration<String> enumKeys = properties.keys();
+ while(enumKeys.hasMoreElements()) {
+ strKey = enumKeys.nextElement();
+ strValue = (String) properties.get(strKey);
+ if ((matcher = GROUP_INDEX_PATTERN.matcher(strKey)).matches()) {
+ addGroup(strKey, strValue);
+ }
+ }
+
+ //PV Section
+ enumKeys = properties.keys();
+ while(enumKeys.hasMoreElements()) {
+ strKey = enumKeys.nextElement();
+ strValue = (String) properties.get(strKey);
+ if ((matcher = PV_INDEX_PATTERN.matcher(strKey)).matches()) {
+ String[] fields = strValue.split(";");
+
+ final SchedulerGroup group = groups.get(fields[1]);
+
+ if (null != group) {
+ PVInfo pvInfo = new PVInfo();
+ pvInfo.strPv = fields[0];
+ pvInfo.strGroup = fields[1];
+ pvInfo.delta = Double.parseDouble(fields[2]);
+ pvInfo.strTag = fields[3];
+
+ PVEventRecorder recorder = new PVEventRecorder();
+ PVReader<VType> pvr = gpClient.read(pvInfo.strPv).
+ addListener(recorder).
+ addReadListener(this).
+ start();
+ LOGGER.info("Registered RT Pv: " + pvInfo.strPv);
+ pvInfo.pvr = pvr;
+ pvInfo.lastValue = null;
+ group.addPvReader(strKey, pvInfo);
+ }
+ };
+ }
+ }
+
+ @Override
+ public String getName() {
+ return "Merlot - Rt";
+ }
+
+ @Override
+ public void deleted(String pid) {
+ //
+ }
+
+ @Override
+ public void addGroup(String strGroup, String... args) {
+ if (null != args) {
+ if (null != args[0]) {
+ Integer period = Integer.parseInt(args[0]) * 1000;
+ ScheduleOptions schOptions =
scheduler.AT(Date.from(Instant.now()), -1, period);
+ schOptions.name(strGroup);
+
+ SchedulerGroup group = new SchedulerGroup(eventAdmin,
schOptions);
+ groups.put(strGroup, group);
+
+ try {
+ scheduler.schedule(group, schOptions);
+ } catch (Exception ex) {
+ LOGGER.error(ex.getMessage());
+ }
+
+ }
+ }
+ }
+
+ @Override
+ public void removeGroup(String strGroup) {
+ scheduler.unschedule(strGroup);
+ groups.remove(strGroup);
+ }
+
+ @Override
+ public void schedulerGroup(String strGroup, int scanTime) {
+
+
+ }
+
+ @Override
+ public void putPvRecord(String strPvName, String... args) {
+ if (null != args){
+ if (null != args[0]) {
+
+ }
+ }
+ }
+
+ @Override
+ public void removePvRecord(String strPvName) {
+
+ }
+
+ @Override
+ public void pvChanged(PVEvent event, PVReader pvReader) {
+ if (event.isType(PVEvent.Type.EXCEPTION)) {
+ LOGGER.info("EVENT: " + event.toString());
+ }
+
+ }
+
+ private class PVInfo {
+ public PVReader<VType> pvr;
+ public VNumber lastValue;
+ public String strPv;
+ public String strGroup;
+ public Double delta;
+ public String strTag;
+ }
+
+ private class SchedulerGroup implements Job {
+ private final EventAdmin eventAdmin;
+ private final ScheduleOptions schOptions;
+
+ private final Map<String, PVInfo> pvs = new ConcurrentHashMap<>();
+ private Map<String, String> properties = new Hashtable();
+ private VNumber value;
+
+ public SchedulerGroup(EventAdmin eventAdmin, ScheduleOptions
schOptions) {
+ this.eventAdmin = eventAdmin;
+ this.schOptions = schOptions;
+ }
+
+ @Override
+ public void execute(JobContext context) {
+ pvs.forEach(new BiConsumer<String, PVInfo>() {
+ @Override
+ public void accept(String s, PVInfo pv) {
+
+ if ((pv.pvr.isConnected()) && (!pv.pvr.isPaused())) {
+
+ value = (VNumber) pv.pvr.getValue();
+ if ((null == pv.lastValue) ||
!value.equals(pv.lastValue)) {
+
+ Double actualValue = ((VNumber)
value).getValue().doubleValue();
+ Double lastValue = (null == pv.lastValue)? 0 :
pv.lastValue.getValue().doubleValue();
+
+ if ((Math.abs(actualValue - lastValue)) >
pv.delta) {
+ pv.lastValue = value;
+ properties.clear();
+ properties.put("tag", pv.strTag);
+ properties.put("value",
value.getValue().toString());
+
+ EventProperties eventProps = new
EventProperties(properties);
+
+ Event decanterEvent = new Event(HTC_ROUTE,
properties);
+ eventAdmin.postEvent(decanterEvent);
+ }
+
+ }
+ }
+ }
+ });
+
+ }
+
+ public void addPvReader(final String strPVIndex, final PVInfo pvInfo){
+ pvs.put(strPVIndex, pvInfo);
+ }
+
+ public ScheduleOptions getScheduleOptions(){
+ return this.schOptions;
+ }
+
+ }
+
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/resources/OSGI-INF/blueprint/decanter-service.xml
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/resources/OSGI-INF/blueprint/decanter-service.xml
new file mode 100644
index 0000000..12623ac
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.archive/src/main/resources/OSGI-INF/blueprint/decanter-service.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
default-activation="eager">
+
+
+ <reference
+ id="refSchedulerService"
+ interface="org.apache.plc4x.merlot.scheduler.api.Scheduler"
availability="mandatory" timeout="1200"/>
+
+ <reference
+ id="refEventAdminService"
+ interface="org.osgi.service.event.EventAdmin" availability="mandatory"
timeout="1200"/>
+
+ <bean id="MerlotGPClientBean"
+ class="org.apache.plc4x.merlot.archive.impl.MerlotGPClientImpl"
+ init-method="init"
+ destroy-method="destroy"
+ scope="singleton"
+ activation="eager">
+ </bean>
+
+ <bean id="MerlotMqttAppenderFactoryBean"
+
class="org.apache.plc4x.merlot.archive.impl.MerlotMqttAppenderFactory"
+ scope="singleton"
+ activation="eager">
+ </bean>
+
+ <bean id="MerlotDecanterManagedServiceBean"
+
class="org.apache.plc4x.merlot.archive.core.MerlotDecanterManagedService"
+ scope="singleton"
+ activation="eager">
+ <argument ref="blueprintBundleContext" />
+ </bean>
+
+ <bean id="MerlotPvHtcCollectorImplBean"
+
class="org.apache.plc4x.merlot.archive.impl.MerlotPvHtcCollectorImpl"
+ init-method="init"
+ scope="singleton"
+ activation="eager">
+ <argument ref="refSchedulerService" />
+ <argument ref="refEventAdminService" />
+ <argument ref="MerlotGPClientBean" />
+ </bean>
+
+ <bean id="MerlotPvRtCollectorImplBean"
+
class="org.apache.plc4x.merlot.archive.impl.MerlotPvRtCollectorImpl"
+ init-method="init"
+ scope="singleton"
+ activation="eager">
+ <argument ref="refSchedulerService" />
+ <argument ref="refEventAdminService" />
+ <argument ref="MerlotGPClientBean" />
+ </bean>
+
+ <bean id="MerlotDataBrowserSupportImplBean"
+
class="org.apache.plc4x.merlot.archive.impl.MerlotDataBrowserSupportImpl"
+ init-method="init"
+ scope="singleton"
+ activation="eager">
+ <argument ref="refSchedulerService" />
+ <argument ref="refEventAdminService" />
+ <argument ref="MerlotGPClientBean" />
+ </bean>
+
+ <service ref="MerlotGPClientBean" auto-export="interfaces">
+ </service>
+
+ <service ref="MerlotMqttAppenderFactoryBean" auto-export="interfaces">
+ <service-properties>
+ <entry key="org.plc4x.merlot.archive.factory"
value="mqtt-appender"/>
+ </service-properties>
+ </service>
+
+ <service ref="MerlotDecanterManagedServiceBean" auto-export="interfaces">
+ <service-properties>
+ <entry key="service.pid" value="org.apache.plc4x.merlot.archive"/>
+ <entry key="scheduler.name"
value="MerlotDecanterManagedServiceBean"/>
+ <entry key="scheduler.period" value="5000"/>
+ <entry key="scheduler.immediate" value="true"/>
+ <entry key="scheduler.concurrent" value="false"/>
+ </service-properties>
+ </service>
+
+ <service ref="MerlotPvHtcCollectorImplBean" auto-export="interfaces">
+ <service-properties>
+ <entry key="service.pid" value="org.apache.plc4x.merlot.pvhtc"/>
+ </service-properties>
+ </service>
+
+ <service ref="MerlotPvRtCollectorImplBean" auto-export="interfaces">
+ <service-properties>
+ <entry key="service.pid" value="org.apache.plc4x.merlot.pvrt"/>
+ </service-properties>
+ </service>
+
+ <service ref="MerlotDataBrowserSupportImplBean" auto-export="interfaces">
+ <service-properties>
+ <entry key="service.pid" value="org.apache.plc4x.merlot.archive"/>
+ </service-properties>
+ </service>
+
+</blueprint>
\ No newline at end of file
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.db/nbproject/project.properties
b/plc4j/tools/merlot/org.apache.plc4x.merlot.db/nbproject/project.properties
new file mode 100644
index 0000000..e69de29
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.db/src/test/java/org/apache/plc4x/merlot/db/core/DBTestSuite.java
b/plc4j/tools/merlot/org.apache.plc4x.merlot.db/src/test/java/org/apache/plc4x/merlot/db/core/DBTestSuite.java
new file mode 100644
index 0000000..227f74c
--- /dev/null
+++
b/plc4j/tools/merlot/org.apache.plc4x.merlot.db/src/test/java/org/apache/plc4x/merlot/db/core/DBTestSuite.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.plc4x.merlot.db.core;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ *
+ * @author cgarcia
+ */
+@RunWith(Suite.class)
[email protected]({
+ org.apache.plc4x.merlot.db.core.DBBooleanFactoryTest.class
+})
+public class DBTestSuite {
+
+}
diff --git
a/plc4j/tools/merlot/org.apache.plc4x.merlot.drv.s7/nbproject/project.properties
b/plc4j/tools/merlot/org.apache.plc4x.merlot.drv.s7/nbproject/project.properties
new file mode 100644
index 0000000..e69de29