This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/plc4x.git

commit eb74274a4e6b09f859470ddfda885b15e08cccba
Author: Stefan Herrmann <[email protected]>
AuthorDate: Thu Aug 15 14:07:22 2019 +0200

    add first scraper implementation
---
 plc4j/integrations/logstash-plugin/pom.xml         |  10 +-
 .../java/org/apache/plc4x/logstash/Plc4xInput.java | 108 ++++++++++++++-------
 ...vaInputExampleTest.java => Plc4xInputTest.java} |   4 +-
 3 files changed, 82 insertions(+), 40 deletions(-)

diff --git a/plc4j/integrations/logstash-plugin/pom.xml 
b/plc4j/integrations/logstash-plugin/pom.xml
index 936044b..be05032 100644
--- a/plc4j/integrations/logstash-plugin/pom.xml
+++ b/plc4j/integrations/logstash-plugin/pom.xml
@@ -117,13 +117,13 @@
     </dependency>
     <dependency>
       <groupId>org.apache.plc4x</groupId>
-      <artifactId>plc4x-tools-logstash-core</artifactId>
-      <version>${project.version}-${logstash.version}</version>
+      <artifactId>plc4j-scraper</artifactId>
+      <version>0.5.0-SNAPSHOT</version>
     </dependency>
-
     <dependency>
-      <groupId>org.apache.commons</groupId>
-      <artifactId>commons-lang3</artifactId>
+      <groupId>org.apache.plc4x</groupId>
+      <artifactId>plc4x-tools-logstash-core</artifactId>
+      <version>${project.version}-${logstash.version}</version>
     </dependency>
   </dependencies>
 
diff --git 
a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
 
b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
index 6f31d04..dc37936 100644
--- 
a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
+++ 
b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java
@@ -18,42 +18,49 @@ under the License.
 */
 package org.apache.plc4x.logstash;
 
-import co.elastic.logstash.api.Configuration;
-import co.elastic.logstash.api.Context;
-import co.elastic.logstash.api.Input;
-import co.elastic.logstash.api.LogstashPlugin;
-import co.elastic.logstash.api.PluginConfigSpec;
+import co.elastic.logstash.api.*;
 import org.apache.plc4x.java.PlcDriverManager;
-import org.apache.plc4x.java.api.PlcConnection;
-import org.apache.plc4x.java.api.exceptions.PlcConnectionException;
-import org.apache.plc4x.java.api.messages.PlcReadRequest;
-import org.apache.plc4x.java.api.messages.PlcReadResponse;
+import org.apache.plc4x.java.scraper.ResultHandler;
+import 
org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder;
+import 
org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl;
+import 
org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder;
+import org.apache.plc4x.java.scraper.exception.ScraperException;
+import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl;
+import 
org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector;
+import 
org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl;
+import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 import java.util.function.Consumer;
 
 // class name must match plugin name
 @LogstashPlugin(name="plc4x_input")
 public class Plc4xInput implements Input {
 
-    public static final PluginConfigSpec<Map<String, Object>> FIELDS_CONFIG =
-            PluginConfigSpec.hashSetting("fields");
+    public static final PluginConfigSpec<List<Object>> QUERY_CONFIG =
+            PluginConfigSpec.arraySetting("queries");
 
     public static final PluginConfigSpec<String> CONNECTION_STRING_CONFIG =
             PluginConfigSpec.requiredStringSetting("connection_string");
     private final String connectionString;
-    private final Map<String, Object> fields;
+    private final List<Object> queries;
 
     private String id;
-    private PlcConnection plcConnection;
+    private PlcDriverManager plcDriverManager;
+    private TriggerCollector triggerCollector;
+    private TriggeredScraperImpl scraper;
+
+    private final CountDownLatch done = new CountDownLatch(1);
 
     // all plugins must provide a constructor that accepts id, Configuration, 
and Context
     public Plc4xInput(String id, Configuration config, Context context) {
         // constructors should validate configuration options
         this.id = id;
-        fields = config.get(FIELDS_CONFIG);
+        queries = config.get(QUERY_CONFIG);
         connectionString = config.get(CONNECTION_STRING_CONFIG);
     }
 
@@ -69,44 +76,79 @@ public class Plc4xInput implements Input {
         // a finite sequence of events should loop until that sequence is 
exhausted or until they
         // receive a stop request, whichever comes first.
         // Establish a connection to the plc using the url provided as first 
argument
-        try (PlcConnection plcConnection = new 
PlcDriverManager().getConnection(connectionString)) {
+        ScraperConfigurationTriggeredImplBuilder builder = new 
ScraperConfigurationTriggeredImplBuilder();
+        //TODO: use multiple sources:
+        String connectionName = "connection1";
+        builder.addSource(connectionName, connectionString);
+
+        List<Object> jobConfigs = queries;
 
-            // Check if this connection support reading of data.
-            if (!plcConnection.getMetadata().canRead()) {
-                System.err.println("This connection doesn't support reading.");
-                return;
+        for (Object jobConfig : jobConfigs) {
+            if (!(jobConfig instanceof String)){
+                System.err.println("Query String is not String!");
+                continue;
+            }
+            String config = (String) jobConfig;
+            String[] jobConfigSegments = config.split(":");
+            if(jobConfigSegments.length < 4) {
+                //TODO: use logging from logstash
+                System.out.println(String.format("Error in job configuration 
'%s'. " +
+                    "The configuration expects at least 4 segments: " +
+                    "{job-name}:{rate}(:{field-alias}#{field-address})+", 
jobConfig));
+                continue;
             }
 
-            // Create a new read request:
-            PlcReadRequest.Builder builder = 
plcConnection.readRequestBuilder();
-            for (String key: fields.keySet()
-                 ) {
-                builder.addItem(key, fields.get(key).toString());
+            String jobName = jobConfigSegments[0];
+            Integer rate = Integer.valueOf(jobConfigSegments[1]);
+            JobConfigurationTriggeredImplBuilder jobBuilder = builder.job(
+                jobName, String.format("(SCHEDULED,%s)", 
rate)).source(connectionName);
+            for(int i = 3; i < jobConfigSegments.length; i++) {
+                String[] fieldSegments = jobConfigSegments[i].split("=");
+                if(fieldSegments.length != 2) {
+                    System.err.println(String.format("Error in job 
configuration '%s'. " +
+                            "The field segment expects a format 
{field-alias}#{field-address}, but got '%s'",
+                        jobName, jobConfigSegments[i]));
+                    continue;
+                }
+                String fieldAlias = fieldSegments[0];
+                String fieldAddress = fieldSegments[1];
+                jobBuilder.field(fieldAlias, fieldAddress);
             }
-            PlcReadRequest readRequest = builder.build();
+        }
 
-            PlcReadResponse syncResponse = readRequest.execute().get();
-        } catch (PlcConnectionException e) {
-            e.printStackTrace();
-        } catch (Exception e) {
-            e.printStackTrace();
+        ScraperConfigurationTriggeredImpl scraperConfig = builder.build();
+        try {
+            plcDriverManager = new PooledPlcDriverManager();
+            triggerCollector = new TriggerCollectorImpl(plcDriverManager);
+            scraper = new TriggeredScraperImpl(scraperConfig, new 
ResultHandler() {
+                @Override
+                public void handle(String jobName, String sourceName, 
Map<String, Object> results) {
+                    //TODO: use jobname etc for multiple connections
+                    consumer.accept(results);
+                }
+            }, triggerCollector);
+            scraper.start();
+            triggerCollector.start();
+        } catch (ScraperException e) {
+            System.err.println("Error starting the scraper: "+ e);
         }
     }
 
     @Override
     public void stop() {
-//        stopped = true; // set flag to request cooperative stop of input
+        triggerCollector.stop();
+        scraper.stop();
     }
 
     @Override
     public void awaitStop() throws InterruptedException {
-//        done.await(); // blocks until input has stopped
+       done.await(); // blocks until input has stopped
     }
 
     @Override
     public Collection<PluginConfigSpec<?>> configSchema() {
         // should return a list of all configuration options for this plugin
-        return Arrays.asList(FIELDS_CONFIG, CONNECTION_STRING_CONFIG);
+        return Arrays.asList(QUERY_CONFIG, CONNECTION_STRING_CONFIG);
     }
 
     @Override
diff --git 
a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/JavaInputExampleTest.java
 
b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
similarity index 96%
rename from 
plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/JavaInputExampleTest.java
rename to 
plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
index 6d89749..16db66c 100644
--- 
a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/JavaInputExampleTest.java
+++ 
b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java
@@ -31,11 +31,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
-public class JavaInputExampleTest {
+public class Plc4xInputTest {
 
     @Test
     @Ignore
-    public void testJavaInputExample() {
+    public void testPlc4xInput() {
         String prefix = "This is message";
         long eventCount = 5;
         Map<String, Object> configValues = new HashMap<>();

Reply via email to