This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/plc4x.git
commit 242bffa1adc0fe2247b74e400e634a3266ad57ab Author: Stefan Herrmann <[email protected]> AuthorDate: Thu Aug 15 11:54:18 2019 +0200 initial source commit --- plc4j/integrations/logstash-plugin/.gitignore | 98 ----------------- plc4j/integrations/logstash-plugin/README.md | 9 ++ .../logstash-plugin/docs/index.asciidoc | 117 +++++++++++++++++++++ .../java/org/apache/plc4x/logstash/Plc4xInput.java | 102 ++++++++++++++++++ .../plc4x/logstash/JavaInputExampleTest.java | 53 ++++++++++ 5 files changed, 281 insertions(+), 98 deletions(-) diff --git a/plc4j/integrations/logstash-plugin/.gitignore b/plc4j/integrations/logstash-plugin/.gitignore deleted file mode 100644 index ddee199..0000000 --- a/plc4j/integrations/logstash-plugin/.gitignore +++ /dev/null @@ -1,98 +0,0 @@ -*.gem -*.rbc -/.config -/coverage/ -/InstalledFiles -/pkg/ -/spec/reports/ -/spec/examples.txt -/test/tmp/ -/test/version_tmp/ -/tmp/ - -# Used by dotenv library to load environment variables. -# .env - -# Ignore Byebug command history file. -.byebug_history - -## Specific to RubyMotion: -.dat* -.repl_history -build/ -*.bridgesupport -build-iPhoneOS/ -build-iPhoneSimulator/ - -## Specific to RubyMotion (use of CocoaPods): -# -# We recommend against adding the Pods directory to your .gitignore. However -# you should judge for yourself, the pros and cons are mentioned at: -# https://guides.cocoapods.org/using/using-cocoapods.html#should-i-check-the-pods-directory-into-source-control -# -# vendor/Pods/ - -## Documentation cache and generated files: -/.yardoc/ -/_yardoc/ -/doc/ -/rdoc/ - -## Environment normalization: -/.bundle/ -/vendor/bundle -/lib/bundler/man/ - -# for a library or gem, you might want to ignore these files since the code is -# intended to run in multiple environments; otherwise, check them in: -# Gemfile.lock -# .ruby-version -# .ruby-gemset - -# unless supporting rvm < 1.11.0 or doing something fancy, ignore this: -.rvmrc - -# Compiled class file -*.class - -# Log file -*.log - -# BlueJ files -*.ctxt - -# Mobile Tools for Java (J2ME) -.mtj.tmp/ - -# Package Files # -*.jar -*.war -*.nar -*.ear -*.zip -*.tar.gz -*.rar - -# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml -hs_err_pid* - -.idea - -.gradle/ -build/ -vendor/ -lib/ - -*.gemspec - -# Ignore Gradle GUI config -gradle-app.setting - -# Avoid ignoring Gradle wrapper jar file (.jar files are usually ignored) -!gradle-wrapper.jar - -# Cache of project -.gradletasknamecache - -# # Work around https://youtrack.jetbrains.com/issue/IDEA-116898 -# gradle/wrapper/gradle-wrapper.properties \ No newline at end of file diff --git a/plc4j/integrations/logstash-plugin/README.md b/plc4j/integrations/logstash-plugin/README.md new file mode 100644 index 0000000..1050718 --- /dev/null +++ b/plc4j/integrations/logstash-plugin/README.md @@ -0,0 +1,9 @@ +# Logstash Java Plugin + +[](https://travis-ci.org/logstash-plugins/logstash-filter-java_filter_example) + +This is a Java plugin for [Logstash](https://github.com/elastic/logstash). + +It is fully free and fully open source. The license is Apache 2.0, meaning you are free to use it however you want. + +The documentation for Logstash Java plugins is available [here](https://www.elastic.co/guide/en/logstash/6.7/contributing-java-plugin.html). diff --git a/plc4j/integrations/logstash-plugin/docs/index.asciidoc b/plc4j/integrations/logstash-plugin/docs/index.asciidoc new file mode 100644 index 0000000..cc47ac1 --- /dev/null +++ b/plc4j/integrations/logstash-plugin/docs/index.asciidoc @@ -0,0 +1,117 @@ +:plugin: example +:type: input +:default_codec: plain +// Update header with plugin name and default codec + +/////////////////////////////////////////// +START - GENERATED VARIABLES, DO NOT EDIT! +/////////////////////////////////////////// +:version: %VERSION% +:release_date: %RELEASE_DATE% +:changelog_url: %CHANGELOG_URL% +:include_path: ../../../../logstash/docs/include +/////////////////////////////////////////// +END - GENERATED VARIABLES, DO NOT EDIT! +/////////////////////////////////////////// + +[id="plugins-{type}s-{plugin}"] + +=== Example input plugin + +include::{include_path}/plugin_header.asciidoc[] + +==== Description + +Add plugin description here + +// Format anchors and links to support generated ids for versioning +// Sample anchor: [id="plugins-{type}s-{plugin}-setting_name"] +// Sample link: <<plugins-{type}s-{plugin}-setting_name>> + +[id="plugins-{type}s-{plugin}-options"] +==== Example Input Configuration Options + +[cols="<,<,<",options="header",] +|======================================================================= +|Setting |Input type|Required +| <<plugins-{type}s-{plugin}-a_setting_name>> |<<boolean,boolean>>|No +| <<plugins-{type}s-{plugin}-another_setting_name>> |<<hash,hash>>|No +| <<plugins-{type}s-{plugin}-setting_name_3>> |<<string,string>>|No +| <<plugins-{type}s-{plugin}-setting_name_4>> |<<number,number>>|No +| <<plugins-{type}s-{plugin}-setting_name_5>> |<<array,array>>|No +| <<plugins-{type}s-{plugin}-setting_name_6>> |<<bytes,bytes>>|No +| <<plugins-{type}s-{plugin}-setting_name_7>> |<<path,path>>|No +| <<plugins-{type}s-{plugin}-setting_name_8>> |<<password,password>>|No +|======================================================================= + +[id="plugins-{type}s-{plugin}-a_setting_name"] +===== `a_setting_name` + + * Value type is <<boolean,boolean>> + * Default value is `true` + +Add description here + +[id="plugins-{type}s-{plugin}-another_setting_name"] +===== `another_setting_name` + + * Value type is <<hash,hash>> + * Default value is `{}` + +Add description here + +[id="plugins-{type}s-{plugin}-setting_name_3"] +===== `setting_name_3` + + * Value type is <<string,string>> + * Default value is `{}` + +Add description here + +[id="plugins-{type}s-{plugin}-setting_name_4"] +===== `setting_name_4` + + * Value type is <<number,number>> + * Default value is `0` + +Add description here + +[id="plugins-{type}s-{plugin}-setting_name_5"] +===== `setting_name_5` + + * Value type is <<array,array>> + * Default value is {} + +Add description here + +[id="plugins-{type}s-{plugin}-setting_name_6"] +===== `setting_name_6` + + * Value type is <<bytes,bytes>> + * Default value is {} + +Add description here + +[id="plugins-{type}s-{plugin}-setting_name_7"] +===== `setting_name_7` + + * Value type is <<path,path>> + * Default value is {} + +Add description here + +[id="plugins-{type}s-{plugin}-setting_name_8"] +===== `setting_name_8` + + * Value type is <<password,password>> + * Default value is {} + +Add description here + +// The full list of Value Types is here: +// https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html + +[id="plugins-{type}s-{plugin}-common-options"] +include::{include_path}/{type}.asciidoc[] + +:default_codec!: diff --git a/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java new file mode 100644 index 0000000..fb77670 --- /dev/null +++ b/plc4j/integrations/logstash-plugin/src/main/java/org/apache/plc4x/logstash/Plc4xInput.java @@ -0,0 +1,102 @@ +package org.apache.plc4x.logstash; + +import co.elastic.logstash.api.Configuration; +import co.elastic.logstash.api.Context; +import co.elastic.logstash.api.Input; +import co.elastic.logstash.api.LogstashPlugin; +import co.elastic.logstash.api.PluginConfigSpec; +import org.apache.commons.lang3.StringUtils; +import org.apache.plc4x.java.PlcDriverManager; +import org.apache.plc4x.java.api.PlcConnection; +import org.apache.plc4x.java.api.exceptions.PlcConnectionException; +import org.apache.plc4x.java.api.messages.PlcReadRequest; +import org.apache.plc4x.java.api.messages.PlcReadResponse; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.function.Consumer; + +// class name must match plugin name +@LogstashPlugin(name="plc4x_input") +public class Plc4xInput implements Input { + + public static final PluginConfigSpec<Map<String, Object>> FIELDS_CONFIG = + PluginConfigSpec.hashSetting("fields"); + + public static final PluginConfigSpec<String> CONNECTION_STRING_CONFIG = + PluginConfigSpec.requiredStringSetting("connection_string"); + private final String connectionString; + private final Map<String, Object> fields; + + private String id; + private PlcConnection plcConnection; + + // all plugins must provide a constructor that accepts id, Configuration, and Context + public Plc4xInput(String id, Configuration config, Context context) { + // constructors should validate configuration options + this.id = id; + fields = config.get(FIELDS_CONFIG); + connectionString = config.get(CONNECTION_STRING_CONFIG); + } + + @Override + public void start(Consumer<Map<String, Object>> consumer) { + + // The start method should push Map<String, Object> instances to the supplied QueueWriter + // instance. Those will be converted to Event instances later in the Logstash event + // processing pipeline. + // + // Inputs that operate on unbounded streams of data or that poll indefinitely for new + // events should loop indefinitely until they receive a stop request. Inputs that produce + // a finite sequence of events should loop until that sequence is exhausted or until they + // receive a stop request, whichever comes first. + // Establish a connection to the plc using the url provided as first argument + try (PlcConnection plcConnection = new PlcDriverManager().getConnection(connectionString)) { + + // Check if this connection support reading of data. + if (!plcConnection.getMetadata().canRead()) { + System.err.println("This connection doesn't support reading."); + return; + } + + // Create a new read request: + PlcReadRequest.Builder builder = plcConnection.readRequestBuilder(); + for (String key: fields.keySet() + ) { + builder.addItem(key, fields.get(key).toString()); + } + PlcReadRequest readRequest = builder.build(); + + PlcReadResponse syncResponse = readRequest.execute().get(); + } catch (PlcConnectionException e) { + e.printStackTrace(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void stop() { + stopped = true; // set flag to request cooperative stop of input + } + + @Override + public void awaitStop() throws InterruptedException { + done.await(); // blocks until input has stopped + } + + @Override + public Collection<PluginConfigSpec<?>> configSchema() { + // should return a list of all configuration options for this plugin + return Arrays.asList(FIELDS_CONFIG, CONNECTION_STRING_CONFIG); + } + + @Override + public String getId() { + return this.id; + } +} diff --git a/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/JavaInputExampleTest.java b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/JavaInputExampleTest.java new file mode 100644 index 0000000..19597e3 --- /dev/null +++ b/plc4j/integrations/logstash-plugin/src/test/java/org/apache/plc4x/logstash/JavaInputExampleTest.java @@ -0,0 +1,53 @@ +package org.apache.plc4x.logstash; + +import co.elastic.logstash.api.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.junit.Assert; +import org.junit.Test; +import org.logstash.plugins.ConfigurationImpl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Consumer; + +public class JavaInputExampleTest { + + @Test + public void testJavaInputExample() { + String prefix = "This is message"; + long eventCount = 5; + Map<String, Object> configValues = new HashMap<>(); + configValues.put(Plc4xInput.PREFIX_CONFIG.name(), prefix); + configValues.put(Plc4xInput.EVENT_COUNT_CONFIG.name(), eventCount); + Configuration config = new ConfigurationImpl(configValues); + Plc4xInput input = new Plc4xInput("test-id", config, null); + TestConsumer testConsumer = new TestConsumer(); + input.start(testConsumer); + + List<Map<String, Object>> events = testConsumer.getEvents(); + Assert.assertEquals(eventCount, events.size()); + for (int k = 1; k <= events.size(); k++) { + Assert.assertEquals(prefix + " " + StringUtils.center(k + " of " + eventCount, 20), + events.get(k - 1).get("message")); + } + } + + private static class TestConsumer implements Consumer<Map<String, Object>> { + + private List<Map<String, Object>> events = new ArrayList<>(); + + @Override + public void accept(Map<String, Object> event) { + synchronized (this) { + events.add(event); + } + } + + public List<Map<String, Object>> getEvents() { + return events; + } + } + +}
