This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit eb74274a4e6b09f859470ddfda885b15e08cccba Author: Stefan Herrmann <[email protected]> AuthorDate: Thu Aug 15 14:07:22 2019 +0200 add first scraper implementation --- plc4j/integrations/logstash-plugin/pom.xml | 10 +- .../java/org/apache/plc4x/logstash/Plc4xInput.java | 108 ++++++++++++++------- ...vaInputExampleTest.java => Plc4xInputTest.java} | 4 +- 3 files changed, 82 insertions(+), 40 deletions(-) diff --git a/plc4j/integrations/logstash-plugin/pom.xml b/plc4j/integrations/logstash-plugin/pom.xml index 936044b..be05032 100644 --- a/plc4j/integrations/logstash-plugin/pom.xml +++ b/plc4j/integrations/logstash-plugin/pom.xml @@ -117,13 +117,13 @@ </dependency> <dependency> <groupId>org.apache.plc4x</groupId> - <artifactId>plc4x-tools-logstash-core</artifactId> - <version>${project.version}-${logstash.version}</version> + <artifactId>plc4j-scraper</artifactId> + <version>0.5.0-SNAPSHOT</version> </dependency> - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-lang3</artifactId> + <groupId>org.apache.plc4x</groupId> + <artifactId>plc4x-tools-logstash-core</artifactId> + <version>${project.version}-${logstash.version}</version> </dependency> </dependencies> diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java index 6f31d04..dc37936 100644 --- a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java +++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java @@ -18,42 +18,49 @@ under the License. */ package org.apache.plc4x.logstash; -import co.elastic.logstash.api.Configuration; -import co.elastic.logstash.api.Context; -import co.elastic.logstash.api.Input; -import co.elastic.logstash.api.LogstashPlugin; -import co.elastic.logstash.api.PluginConfigSpec; +import co.elastic.logstash.api.*; import org.apache.plc4x.java.PlcDriverManager; -import org.apache.plc4x.java.api.PlcConnection; -import org.apache.plc4x.java.api.exceptions.PlcConnectionException; -import org.apache.plc4x.java.api.messages.PlcReadRequest; -import org.apache.plc4x.java.api.messages.PlcReadResponse; +import org.apache.plc4x.java.scraper.ResultHandler; +import org.apache.plc4x.java.scraper.config.triggeredscraper.JobConfigurationTriggeredImplBuilder; +import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImpl; +import org.apache.plc4x.java.scraper.config.triggeredscraper.ScraperConfigurationTriggeredImplBuilder; +import org.apache.plc4x.java.scraper.exception.ScraperException; +import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl; +import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector; +import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl; +import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager; import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; // class name must match plugin name @LogstashPlugin(name="plc4x_input") public class Plc4xInput implements Input { - public static final PluginConfigSpec<Map<String, Object>> FIELDS_CONFIG = - PluginConfigSpec.hashSetting("fields"); + public static final PluginConfigSpec<List<Object>> QUERY_CONFIG = + PluginConfigSpec.arraySetting("queries"); public static final PluginConfigSpec<String> CONNECTION_STRING_CONFIG = PluginConfigSpec.requiredStringSetting("connection_string"); private final String connectionString; - private final Map<String, Object> fields; + private final List<Object> queries; private String id; - private PlcConnection plcConnection; + private PlcDriverManager plcDriverManager; + private TriggerCollector triggerCollector; + private TriggeredScraperImpl scraper; + + private final CountDownLatch done = new CountDownLatch(1); // all plugins must provide a constructor that accepts id, Configuration, and Context public Plc4xInput(String id, Configuration config, Context context) { // constructors should validate configuration options this.id = id; - fields = config.get(FIELDS_CONFIG); + queries = config.get(QUERY_CONFIG); connectionString = config.get(CONNECTION_STRING_CONFIG); } @@ -69,44 +76,79 @@ public class Plc4xInput implements Input { // a finite sequence of events should loop until that sequence is exhausted or until they // receive a stop request, whichever comes first. // Establish a connection to the plc using the url provided as first argument - try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionString)) { + ScraperConfigurationTriggeredImplBuilder builder = new ScraperConfigurationTriggeredImplBuilder(); + //TODO: use multiple sources: + String connectionName = "connection1"; + builder.addSource(connectionName, connectionString); + + List<Object> jobConfigs = queries; - // Check if this connection support reading of data. - if (!plcConnection.getMetadata().canRead()) { - System.err.println("This connection doesn't support reading."); - return; + for (Object jobConfig : jobConfigs) { + if (!(jobConfig instanceof String)){ + System.err.println("Query String is not String!"); + continue; + } + String config = (String) jobConfig; + String[] jobConfigSegments = config.split(":"); + if(jobConfigSegments.length < 4) { + //TODO: use logging from logstash + System.out.println(String.format("Error in job configuration '%s'. " + + "The configuration expects at least 4 segments: " + + "{job-name}:{rate}(:{field-alias}#{field-address})+", jobConfig)); + continue; } - // Create a new read request: - PlcReadRequest.Builder builder = plcConnection.readRequestBuilder(); - for (String key: fields.keySet() - ) { - builder.addItem(key, fields.get(key).toString()); + String jobName = jobConfigSegments[0]; + Integer rate = Integer.valueOf(jobConfigSegments[1]); + JobConfigurationTriggeredImplBuilder jobBuilder = builder.job( + jobName, String.format("(SCHEDULED,%s)", rate)).source(connectionName); + for(int i = 3; i < jobConfigSegments.length; i++) { + String[] fieldSegments = jobConfigSegments[i].split("="); + if(fieldSegments.length != 2) { + System.err.println(String.format("Error in job configuration '%s'. " + + "The field segment expects a format {field-alias}#{field-address}, but got '%s'", + jobName, jobConfigSegments[i])); + continue; + } + String fieldAlias = fieldSegments[0]; + String fieldAddress = fieldSegments[1]; + jobBuilder.field(fieldAlias, fieldAddress); } - PlcReadRequest readRequest = builder.build(); + } - PlcReadResponse syncResponse = readRequest.execute().get(); - } catch (PlcConnectionException e) { - e.printStackTrace(); - } catch (Exception e) { - e.printStackTrace(); + ScraperConfigurationTriggeredImpl scraperConfig = builder.build(); + try { + plcDriverManager = new PooledPlcDriverManager(); + triggerCollector = new TriggerCollectorImpl(plcDriverManager); + scraper = new TriggeredScraperImpl(scraperConfig, new ResultHandler() { + @Override + public void handle(String jobName, String sourceName, Map<String, Object> results) { + //TODO: use jobname etc for multiple connections + consumer.accept(results); + } + }, triggerCollector); + scraper.start(); + triggerCollector.start(); + } catch (ScraperException e) { + System.err.println("Error starting the scraper: "+ e); } } @Override public void stop() { -// stopped = true; // set flag to request cooperative stop of input + triggerCollector.stop(); + scraper.stop(); } @Override public void awaitStop() throws InterruptedException { -// done.await(); // blocks until input has stopped + done.await(); // blocks until input has stopped } @Override public Collection<PluginConfigSpec<?>> configSchema() { // should return a list of all configuration options for this plugin - return Arrays.asList(FIELDS_CONFIG, CONNECTION_STRING_CONFIG); + return Arrays.asList(QUERY_CONFIG, CONNECTION_STRING_CONFIG); } @Override diff --git a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/JavaInputExampleTest.java b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java similarity index 96% rename from plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/JavaInputExampleTest.java rename to plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java index 6d89749..16db66c 100644 --- a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/JavaInputExampleTest.java +++ b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java @@ -31,11 +31,11 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; -public class JavaInputExampleTest { +public class Plc4xInputTest { @Test @Ignore - public void testJavaInputExample() { + public void testPlc4xInput() { String prefix = "This is message"; long eventCount = 5; Map<String, Object> configValues = new HashMap<>();
