This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 76e27ee70ed07d96c2d6eb9f7123da88110ac12d Author: Stefan Herrmann <[email protected]> AuthorDate: Fri Aug 16 17:03:49 2019 +0200 add implementation for configuration --- plc4j/integrations/logstash-plugin/pom.xml | 7 ++- .../java/org/apache/plc4x/logstash/Plc4xInput.java | 58 ++++++++++------------ .../apache/plc4x/logstash/configuration/Job.java | 36 ++++++++++++++ .../plc4x/logstash/configuration/Source.java | 27 ++++++++++ .../org/apache/plc4x/logstash/Plc4xInputTest.java | 32 ++++++------ 5 files changed, 108 insertions(+), 52 deletions(-) diff --git a/plc4j/integrations/logstash-plugin/pom.xml b/plc4j/integrations/logstash-plugin/pom.xml index 329409e..c3b49c9 100644 --- a/plc4j/integrations/logstash-plugin/pom.xml +++ b/plc4j/integrations/logstash-plugin/pom.xml @@ -162,7 +162,12 @@ <artifactId>plc4x-tools-logstash-core</artifactId> <version>${project.version}-${logstash.version}</version> </dependency> - + <dependency> + <groupId>org.apache.plc4x</groupId> + <artifactId>plc4j-driver-simulated</artifactId> + <version>0.5.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java index dc37936..244ef9e 100644 --- a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java +++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java @@ -29,6 +29,8 @@ import org.apache.plc4x.java.scraper.triggeredscraper.TriggeredScraperImpl; import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollector; import org.apache.plc4x.java.scraper.triggeredscraper.triggerhandler.collector.TriggerCollectorImpl; import org.apache.plc4x.java.utils.connectionpool.PooledPlcDriverManager; +import org.apache.plc4x.logstash.configuration.Job; +import org.apache.plc4x.logstash.configuration.Source; import java.util.Arrays; import java.util.Collection; @@ -41,13 +43,13 @@ import java.util.function.Consumer; @LogstashPlugin(name="plc4x_input") public class Plc4xInput implements Input { - public static final PluginConfigSpec<List<Object>> QUERY_CONFIG = - PluginConfigSpec.arraySetting("queries"); + public static final PluginConfigSpec<Map<String, Object>> JOB_CONFIG = + PluginConfigSpec.hashSetting("jobs"); - public static final PluginConfigSpec<String> CONNECTION_STRING_CONFIG = - PluginConfigSpec.requiredStringSetting("connection_string"); - private final String connectionString; - private final List<Object> queries; + public static final PluginConfigSpec<Map<String, Object>> SOURCE_CONFIG = + PluginConfigSpec.hashSetting("sources"); + private final Map<String, Object> sources; + private final Map<String, Object> jobs; private String id; private PlcDriverManager plcDriverManager; @@ -60,8 +62,8 @@ public class Plc4xInput implements Input { public Plc4xInput(String id, Configuration config, Context context) { // constructors should validate configuration options this.id = id; - queries = config.get(QUERY_CONFIG); - connectionString = config.get(CONNECTION_STRING_CONFIG); + jobs = config.get(JOB_CONFIG); + sources = config.get(SOURCE_CONFIG); } @Override @@ -78,42 +80,32 @@ public class Plc4xInput implements Input { // Establish a connection to the plc using the url provided as first argument ScraperConfigurationTriggeredImplBuilder builder = new ScraperConfigurationTriggeredImplBuilder(); //TODO: use multiple sources: - String connectionName = "connection1"; - builder.addSource(connectionName, connectionString); - List<Object> jobConfigs = queries; - - for (Object jobConfig : jobConfigs) { - if (!(jobConfig instanceof String)){ - System.err.println("Query String is not String!"); - continue; - } - String config = (String) jobConfig; - String[] jobConfigSegments = config.split(":"); - if(jobConfigSegments.length < 4) { - //TODO: use logging from logstash - System.out.println(String.format("Error in job configuration '%s'. " + - "The configuration expects at least 4 segments: " + - "{job-name}:{rate}(:{field-alias}#{field-address})+", jobConfig)); - continue; - } + for (String sourceName : sources.keySet()) { + //TODO: check ! + builder.addSource(sourceName, ((Source) sources.get(sourceName)).getConnectionUri()); + } - String jobName = jobConfigSegments[0]; - Integer rate = Integer.valueOf(jobConfigSegments[1]); + for (String jobName : jobs.keySet()) { + Job job = ((Job) jobs.get(jobName)); JobConfigurationTriggeredImplBuilder jobBuilder = builder.job( - jobName, String.format("(SCHEDULED,%s)", rate)).source(connectionName); - for(int i = 3; i < jobConfigSegments.length; i++) { - String[] fieldSegments = jobConfigSegments[i].split("="); + jobName, String.format("(SCHEDULED,%s)", job.getRate())); + for (String source : job.getSources()) { + jobBuilder.source(source); + } + for (String query : job.getQueries()) { + String[] fieldSegments = query.split("="); if(fieldSegments.length != 2) { System.err.println(String.format("Error in job configuration '%s'. " + "The field segment expects a format {field-alias}#{field-address}, but got '%s'", - jobName, jobConfigSegments[i])); + jobName, query)); continue; } String fieldAlias = fieldSegments[0]; String fieldAddress = fieldSegments[1]; jobBuilder.field(fieldAlias, fieldAddress); } + jobBuilder.build(); } ScraperConfigurationTriggeredImpl scraperConfig = builder.build(); @@ -148,7 +140,7 @@ public class Plc4xInput implements Input { @Override public Collection<PluginConfigSpec<?>> configSchema() { // should return a list of all configuration options for this plugin - return Arrays.asList(QUERY_CONFIG, CONNECTION_STRING_CONFIG); + return Arrays.asList(JOB_CONFIG, SOURCE_CONFIG); } @Override diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Job.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Job.java new file mode 100644 index 0000000..ee86bdb --- /dev/null +++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Job.java @@ -0,0 +1,36 @@ +package org.apache.plc4x.logstash.configuration; + +import java.util.List; + +public class Job { + private final Integer rate; + private final List<String> queries; + private final List<String> sources; + + public Job(Integer rate, List<String> queries, List<String> sources) { + this.rate = rate; + this.queries = queries; + this.sources = sources; + } + + @Override + public String toString() { + return "Job{" + + "rate=" + rate + + ", queries=" + queries + + ", sources=" + sources + + '}'; + } + + public Integer getRate() { + return rate; + } + + public List<String> getQueries() { + return queries; + } + + public List<String> getSources() { + return sources; + } +} diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Source.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Source.java new file mode 100644 index 0000000..b393b37 --- /dev/null +++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/configuration/Source.java @@ -0,0 +1,27 @@ +package org.apache.plc4x.logstash.configuration; + +public class Source { + private final String connectionUri; + private final String sourceAlias; + + @Override + public String toString() { + return "Source{" + + "connectionUri='" + connectionUri + '\'' + + ", sourceAlias='" + sourceAlias + '\'' + + '}'; + } + + public String getConnectionUri() { + return connectionUri; + } + + public String getSourceAlias() { + return sourceAlias; + } + + public Source(String connectionUri, String sourceAlias) { + this.connectionUri = connectionUri; + this.sourceAlias = sourceAlias; + } +} diff --git a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java index 16db66c..4824e3c 100644 --- a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java +++ b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/Plc4xInputTest.java @@ -19,39 +19,35 @@ under the License. package org.apache.plc4x.logstash; import co.elastic.logstash.api.Configuration; -import org.apache.commons.lang3.StringUtils; -import org.junit.Assert; -import org.junit.Ignore; -import org.junit.Test; +import org.apache.plc4x.logstash.configuration.Job; +import org.apache.plc4x.logstash.configuration.Source; +import org.assertj.core.util.Maps; +import org.junit.jupiter.api.Test; import org.logstash.plugins.ConfigurationImpl; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Consumer; public class Plc4xInputTest { @Test - @Ignore public void testPlc4xInput() { - String prefix = "This is message"; - long eventCount = 5; Map<String, Object> configValues = new HashMap<>(); -// configValues.put(Plc4xInput.PREFIX_CONFIG.name(), prefix); -// configValues.put(Plc4xInput.EVENT_COUNT_CONFIG.name(), eventCount); + + Job job1 = new Job(300, Arrays.asList("testfield=RANDOM/foo:INTEGER"), Arrays.asList("TestConnection")); + Source testConnection = new Source("test:hurzpurzfurz", "TestConnection"); + + configValues.put(Plc4xInput.SOURCE_CONFIG.name(), Maps.newHashMap(testConnection.getSourceAlias(), testConnection)); + configValues.put(Plc4xInput.JOB_CONFIG.name(), Maps.newHashMap("job1", job1)); + + Configuration config = new ConfigurationImpl(configValues); Plc4xInput input = new Plc4xInput("test-id", config, null); TestConsumer testConsumer = new TestConsumer(); input.start(testConsumer); List<Map<String, Object>> events = testConsumer.getEvents(); - Assert.assertEquals(eventCount, events.size()); - for (int k = 1; k <= events.size(); k++) { - Assert.assertEquals(prefix + " " + StringUtils.center(k + " of " + eventCount, 20), - events.get(k - 1).get("message")); - } + System.out.println(events.size()); } private static class TestConsumer implements Consumer<Map<String, Object>> {
