This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 0dd5acbff1a9a2699dc00b788215fa082715fefa Author: Till Voss <[email protected]> AuthorDate: Fri Aug 23 08:15:35 2019 +0200 logstash adapter + test --- .../main/java/org/apache/plc4x/logstash/Plc4x.java | 57 +++++++++++++--------- .../org/apache/plc4x/logstash/Plc4xInputTest.java | 6 +-- 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java index 91e2c13..601f8e6 100644 --- a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java +++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4x.java @@ -20,7 +20,6 @@ package org.apache.plc4x.logstash; import co.elastic.logstash.api.*; import org.apache.plc4x.java.PlcDriverManager; -import org.apache.plc4x.java.scraper.ResultHandler; import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder; import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl; import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder; @@ -30,16 +29,15 @@ import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.T import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl; import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; +import java.util.logging.Logger; // class name must match plugin name @LogstashPlugin(name="plc4x") public class Plc4x implements Input { + static Logger logger = Logger.getLogger(Plc4x.class.getName()); public static final PluginConfigSpec<Map<String, Object>> JOB_CONFIG = PluginConfigSpec.hashSetting("jobs"); @@ -80,8 +78,13 @@ public class Plc4x implements Input { //TODO: use multiple sources: for (String sourceName : sources.keySet()) { - //TODO: check ! - builder.addSource(sourceName, ((String) sources.get(sourceName))); + Object o = sources.get(sourceName); + if(o instanceof String) { + String source = (String)o; + builder.addSource(sourceName, source); + } else { + logger.severe("URL of source " + sourceName + "has the wrong typ!"); + } } for (String jobName : jobs.keySet()) { @@ -93,21 +96,16 @@ public class Plc4x implements Input { for (String source : ((List<String>) job.get("sources"))) { jobBuilder.source(source); } - for (String query : ((List<String>) job.get("queries"))) { - String[] fieldSegments = query.split("="); - if (fieldSegments.length != 2) { - System.err.println(String.format("Error in job configuration '%s'. " + - "The field segment expects a format {field-alias}={field-address}, but got '%s'", - jobName, query)); - continue; - } - String fieldAlias = fieldSegments[0]; - String fieldAddress = fieldSegments[1]; + Map<String, Object> queries = (Map<String, Object>) job.get("queries"); + for (String queryName : queries.keySet()) { + + String fieldAlias = queryName; + String fieldAddress = (String) queries.get(queryName); jobBuilder.field(fieldAlias, fieldAddress); } jobBuilder.build(); } else { - System.err.println("Jobs of wrong Type!"); + logger.severe("Jobs of wrong Type!"); } } @@ -115,17 +113,30 @@ public class Plc4x implements Input { try { plcDriverManager = new PooledPlcDriverManager(); triggerCollector = new TriggerCollectorImpl(plcDriverManager); - scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() { - @Override - public void handle(String jobName, String sourceName, Map<String, Object> results) { - //TODO: use jobname etc for multiple connections + scraper = new TriggeredScraperImpl(scraperConfig, (jobName, sourceName, results) -> { + + //TODO: use jobname etc for multiple connections + for (Map.Entry<String, Object> result : results.entrySet()) { + // Get field-name and -value from the results. + String fieldName = result.getKey(); + Object fieldValue = result.getValue(); + + System.out.println("fieldName: " + fieldName); + System.out.println("fieldValue: " + fieldValue); consumer.accept(results); } }, triggerCollector); scraper.start(); triggerCollector.start(); } catch (ScraperException e) { - System.err.println("Error starting the scraper: "+ e); + logger.severe("Error starting the scraper: "+ e); + } + while(true) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } diff --git a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java index 024a43b..9cf3b44 100644 --- a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java +++ b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java @@ -33,14 +33,15 @@ public class Plc4xInputTest { Map<String, Object> configValues = new HashMap<>(); Map<String, Object> jobValues = new HashMap<>(); - List<String> queries = Arrays.asList("testfield=RANDOM/foo:INTEGER"); + Map<String,Object> queries = new HashMap<>(); + queries.put("testfield", "ns=2;i=4"); List<String> sources = Arrays.asList("TestConnection"); jobValues.put("rate", 300); jobValues.put("queries", queries); jobValues.put("sources", sources); - configValues.put(Plc4x.SOURCE_CONFIG.name(), Maps.newHashMap("TestConnection", "test:hurzpurzfurz")); + configValues.put(Plc4x.SOURCE_CONFIG.name(), Maps.newHashMap("TestConnection", "opcua:tcp://localhost:4840/freeopcua/server/")); configValues.put(Plc4x.JOB_CONFIG.name(), Maps.newHashMap("job1", jobValues)); @@ -68,5 +69,4 @@ public class Plc4xInputTest { return events; } } - }
