This is an automated email from the ASF dual-hosted git repository.

cdutz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git

commit 851838e455cfe2fc59eae2e490428596a836a48e
Author: Christofer Dutz <christofer.d...@c-ware.de>
AuthorDate: Tue May 22 17:40:49 2018 +0200

    Updated the iot-factory example to use Elasticsearch to save data which can 
then be displayed with Kibana (for example)
    
    (cherry picked from commit 93ae433)
---
 examples/iot-factory/README.adoc                   |  66 ++++++
 examples/iot-factory/pom.xml                       |  14 +-
 .../iotfactory/IotElasticsearchFactory.java        | 224 +++++++++++++++++++++
 .../plc4x/java/examples/iotfactory/IotFactory.java |  86 --------
 examples/iot-factory/src/main/resources/log4j2.xml |  35 ++++
 5 files changed, 338 insertions(+), 87 deletions(-)

diff --git a/examples/iot-factory/README.adoc b/examples/iot-factory/README.adoc
new file mode 100644
index 0000000..869730f
--- /dev/null
+++ b/examples/iot-factory/README.adoc
@@ -0,0 +1,66 @@
+//
+//  Licensed to the Apache Software Foundation (ASF) under one or more
+//  contributor license agreements.  See the NOTICE file distributed with
+//  this work for additional information regarding copyright ownership.
+//  The ASF licenses this file to You under the Apache License, Version 2.0
+//  (the "License"); you may not use this file except in compliance with
+//  the License.  You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+//
+
+== IoT Factory example
+
+In this example we will be running the full stack of tools for a rudimentary 
factory dashboard.
+
+This example however is everything but production ready.
+First of all we use create an Elasticsearch single node cluster embedded 
within the applications VM.
+Secondly we communicate with the `cluster` using the NodeClient, which is 
super-easy to use, but shouldn't be used in production type environments.
+
+We decided to go this path because this way it's super-easy to setup 
everything and running the example is nothing but starting a Java application.
+
+=== What else do I need?
+
+In General:
+- A PLC
+- A Factory
+
+Even if you might be able to get your hands on a PLC I doubt you have a spare 
factory available.
+
+This example is built to visualize a virtual factory simulated with 
https://factoryio.com/[Factory I/O].
+
+We have decided to stick with one of the on-board examples, to make it as easy 
as possible for you to reproduce.
+The example we used is: "Sorting by Height".
+
+Here's a video demonstrating this:
+https://www.youtube.com/watch?v=B0n8gT1vto4
+
+You can run this with a real PLC or use a PLC simulator.
+In our IoT lab we are currently running this with the real S7-1200.
+
+=== What happens in detail?
+
+When the application is started it first starts up an Elasticsearch node.
+
+After that is up and running, it checks if the indexes we are going to write 
to exist.
+
+If they don't and we would just have ES create them on the fly, we wouldn't be 
able to use the time-data in the time fields as they would be treated as 
numbers.
+
+Any non-existent indexes are created.
+
+After that an `Apache Edgent` instance is created and one stream is created to 
fetch the state of all 8 outputs of the PLC every 100ms.
+
+This stream is then split up.
+
+One branch simply dumps the data into the `factory-data` index.
+
+The second branch is a little more complex as it monitors the state of the 
left and right conveyors to extrapolate if a new `big` or `small box is being 
transported.
+
+Only when a new box is detected an entry is written into the `product-data` 
index.
+
diff --git a/examples/iot-factory/pom.xml b/examples/iot-factory/pom.xml
index 900c83d..fcda208 100644
--- a/examples/iot-factory/pom.xml
+++ b/examples/iot-factory/pom.xml
@@ -33,7 +33,7 @@
   <description>Client application demonstrating PLC4X in our virtual IoT 
Factory.</description>
 
   <properties>
-    
<app.main.class>org.apache.plc4x.java.examples.iotfactory.IotFactory</app.main.class>
+    
<app.main.class>org.apache.plc4x.java.examples.iotfactory.IotElasticsearchFactory</app.main.class>
   </properties>
 
   <dependencies>
@@ -59,6 +59,18 @@
       <version>1.2.0</version>
     </dependency>
 
+    <!-- Elasticsearch dependencies -->
+    <dependency>
+      <groupId>org.elasticsearch</groupId>
+      <artifactId>elasticsearch</artifactId>
+      <version>6.2.4</version>
+    </dependency>
+    <dependency>
+      <groupId>org.elasticsearch.plugin</groupId>
+      <artifactId>transport-netty4-client</artifactId>
+      <version>6.2.4</version>
+    </dependency>
+
     <!-- Required driver implementation -->
     <dependency>
       <groupId>org.apache.plc4x</groupId>
diff --git 
a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
 
b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
new file mode 100644
index 0000000..1a0c225
--- /dev/null
+++ 
b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java
@@ -0,0 +1,224 @@
+package org.apache.plc4x.java.examples.iotfactory;
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.providers.direct.DirectProvider;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.plc4x.edgent.PlcConnectionAdapter;
+import org.apache.plc4x.edgent.PlcFunctions;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.common.xcontent.XContentFactory;
+import org.elasticsearch.common.xcontent.XContentType;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.node.NodeValidationException;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
+
+public class IotElasticsearchFactory {
+
+    public enum ConveyorState {
+        STOPPED,
+        RUNNING_LEFT,
+        RUNNING_RIGHT
+    }
+
+    private ConveyorState conveyorState = ConveyorState.STOPPED;
+
+    private Node startElasticsearchNode() throws NodeValidationException {
+        Node node = new MyNode(Settings.builder()
+            .put("transport.type", "netty4")
+            .put("http.type", "netty4")
+            .put("http.enabled", "true")
+            .put("path.home", "elasticsearch-data")
+            .build(), Collections.singletonList(Netty4Plugin.class));
+        node.start();
+        return node;
+    }
+
+    private void prepareIndexes(Client esClient) throws Exception {
+        IndicesAdminClient indicesAdminClient = esClient.admin().indices();
+
+        // Check if the factory-data index exists and create it, if it doesn't.
+        IndicesExistsRequest factoryDataIndexExistsRequest =
+            indicesAdminClient.prepareExists("product-data").request();
+        
if(!indicesAdminClient.exists(factoryDataIndexExistsRequest).actionGet().isExists())
 {
+            CreateIndexRequest createIndexRequest = new 
CreateIndexRequest("factory-data");
+            createIndexRequest.mapping("FactoryData",
+                "{\n" +
+                    "            \"properties\": {\n" +
+                    "                \"time\": {\n" +
+                    "                    \"type\": \"date\"\n" +
+                    "                }\n" +
+                    "            }\n" +
+                    "        }", XContentType.JSON);
+            CreateIndexResponse createIndexResponse = 
indicesAdminClient.create(createIndexRequest).actionGet();
+            if(!createIndexResponse.isAcknowledged()) {
+                throw new Exception("Could not create index 'product-data'");
+            }
+        }
+
+        // Check if the product-data index exists and create it, if it doesn't.
+        IndicesExistsRequest productDataIndexExistsRequest =
+            indicesAdminClient.prepareExists("product-data").request();
+        
if(!indicesAdminClient.exists(productDataIndexExistsRequest).actionGet().isExists())
 {
+            CreateIndexRequest createIndexRequest = new 
CreateIndexRequest("product-data");
+            createIndexRequest.mapping("ProductData",
+                "{\n" +
+                    "            \"properties\": {\n" +
+                    "                \"time\": {\n" +
+                    "                    \"type\": \"date\"\n" +
+                    "                },\n" +
+                    "                \"type\": {\n" +
+                    "                    \"type\": \"keyword\"\n" +
+                    "                }\n" +
+                    "            }\n" +
+                    "        }", XContentType.JSON);
+            CreateIndexResponse createIndexResponse = 
indicesAdminClient.create(createIndexRequest).actionGet();
+            if(!createIndexResponse.isAcknowledged()) {
+                throw new Exception("Could not create index 'product-data'");
+            }
+        }
+    }
+
+    private void runFactory() throws Exception {
+        // Start an Elasticsearch node.
+        Node esNode = startElasticsearchNode();
+        Client esClient = esNode.client();
+        System.out.println("Started Elasticsearch node on port 9200");
+
+        // Make sure the indexes exist prior to writing to them.
+        prepareIndexes(esClient);
+
+        // Get a plc connection.
+        try (PlcConnectionAdapter plcAdapter = new 
PlcConnectionAdapter("s7://10.10.64.20/1/1")) {
+            // Initialize the Edgent core.
+            DirectProvider dp = new DirectProvider();
+            Topology top = dp.newTopology();
+
+            // Define the event stream.
+            // 1) PLC4X source generating a stream of bytes.
+            Supplier<Byte> plcSupplier = PlcFunctions.byteSupplier(plcAdapter, 
"OUTPUTS/0");
+            // 2) Use polling to get an item from the byte-stream in regular 
intervals.
+            TStream<Byte> plcOutputStates = top.poll(plcSupplier, 100, 
TimeUnit.MILLISECONDS);
+
+            // 3a) Create a stream that pumps all data into a 'factory-data' 
index.
+            TStream<XContentBuilder> factoryData = plcOutputStates.map(value 
-> {
+                boolean conveyorEntry = (value & 1) != 0;
+                boolean load = (value & 2) != 0;
+                boolean unload = (value & 4) != 0;
+                boolean transferLeft = (value & 8) != 0;
+                boolean transferRight = (value & 16) != 0;
+                boolean conveyorLeft = (value & 32) != 0;
+                boolean conveyorRight = (value & 64) != 0;
+
+                try {
+                    return XContentFactory.jsonBuilder()
+                        .startObject()
+                        .field("time", 
Calendar.getInstance().getTimeInMillis())
+                        .field("conveyorEntry", conveyorEntry)
+                        .field("load", load)
+                        .field( "unload", unload)
+                        .field( "transferLeft", transferLeft)
+                        .field( "transferRight", transferRight)
+                        .field( "conveyorLeft", conveyorLeft)
+                        .field( "conveyorRight", conveyorRight)
+                        .endObject();
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+                return null;
+            });
+            TStream<IndexResponse> factoryDataResponses = factoryData.map(
+                value -> esClient.prepareIndex("factory-data", 
"FactoryData").setSource(value).get());
+            factoryDataResponses.print();
+
+            // 3b) Create a stream that does some local analysis to detect big 
and small boxes and to only output
+            //     something to the 'product-data' index, if a new item is 
detected.
+            TStream<XContentBuilder> productData = plcOutputStates.map(value 
-> {
+                    boolean transferLeft = (value & 8) != 0;
+                    boolean transferRight = (value & 16) != 0;
+
+                    if (conveyorState == ConveyorState.STOPPED) {
+                        if (transferLeft | transferRight) {
+                            if (transferLeft) {
+                                conveyorState = ConveyorState.RUNNING_LEFT;
+                                try {
+                                    return XContentFactory.jsonBuilder()
+                                        .startObject()
+                                        .field("time", 
Calendar.getInstance().getTimeInMillis())
+                                        .field("type", "small")
+                                        .endObject();
+                                } catch (IOException e) {
+                                    e.printStackTrace();
+                                }
+                            } else {
+                                conveyorState = ConveyorState.RUNNING_RIGHT;
+                                try {
+                                    return XContentFactory.jsonBuilder()
+                                        .startObject()
+                                        .field("time", 
Calendar.getInstance().getTimeInMillis())
+                                        .field("type", "large")
+                                        .endObject();
+                                } catch (IOException e) {
+                                    e.printStackTrace();
+                                }
+                            }
+                        }
+                    } else if (!(transferLeft | transferRight)) {
+                        conveyorState = ConveyorState.STOPPED;
+                    }
+                    return null;
+                });
+            TStream<IndexResponse> productDataResponses = productData.map(
+                value -> esClient.prepareIndex("product-data", 
"ProductData").setSource(value).get());
+            productDataResponses.print();
+
+            // Submit the topology and hereby start the event streams.
+            dp.submit(top);
+        }
+    }
+
+    public static void main(String[] args) throws Exception {
+        IotElasticsearchFactory factory = new IotElasticsearchFactory();
+        factory.runFactory();
+    }
+
+    private static class MyNode extends Node {
+        private MyNode(Settings preparedSettings, Collection<Class<? extends 
Plugin>> classpathPlugins) {
+            
super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), 
classpathPlugins);
+        }
+    }
+
+}
diff --git 
a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotFactory.java
 
b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotFactory.java
deleted file mode 100644
index 91f6d26..0000000
--- 
a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotFactory.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package org.apache.plc4x.java.examples.iotfactory;/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-import org.apache.edgent.function.Supplier;
-import org.apache.edgent.providers.direct.DirectProvider;
-import org.apache.edgent.topology.TStream;
-import org.apache.edgent.topology.Topology;
-import org.apache.plc4x.edgent.PlcConnectionAdapter;
-import org.apache.plc4x.edgent.PlcFunctions;
-
-import java.util.concurrent.TimeUnit;
-
-public class IotFactory {
-
-    public enum ConveyorState {
-        STOPPED,
-        RUNNING_LEFT,
-        RUNNING_RIGHT
-    };
-
-    private static int smallBoxes = 0;
-    private static int largeBoxes = 0;
-    private static ConveyorState conveyorState = ConveyorState.STOPPED;
-
-
-    public static void main(String[] args) throws Exception {
-        // Get a plc connection.
-        try (PlcConnectionAdapter plcAdapter = new 
PlcConnectionAdapter("s7://10.10.64.20/1/1")) {
-            // Initialize the Edgent core.
-            DirectProvider dp = new DirectProvider();
-            Topology top = dp.newTopology();
-
-            // Define the event stream.
-            // 1) PLC4X source generating a stream of bytes.
-            Supplier<Byte> plcSupplier = PlcFunctions.byteSupplier(plcAdapter, 
"OUTPUTS/0");
-            // 2) Use polling to get an item from the byte-stream in regular 
intervals.
-            TStream<Byte> source = top.poll(plcSupplier, 100, 
TimeUnit.MILLISECONDS);
-            // 3) Output the events in the stream on the console.
-
-            source.sink(value -> {
-                boolean runningLeft = (value & 8) != 0;
-                boolean runningRight = (value & 16) != 0;
-
-                if(conveyorState == ConveyorState.STOPPED) {
-                    if(runningLeft | runningRight) {
-                        if (runningLeft) {
-                            smallBoxes++;
-                            conveyorState = ConveyorState.RUNNING_LEFT;
-                            updateOutput();
-                        } else {
-                            largeBoxes++;
-                            conveyorState = ConveyorState.RUNNING_RIGHT;
-                            updateOutput();
-                        }
-                    }
-                } else if (!(runningLeft | runningRight)){
-                    conveyorState = ConveyorState.STOPPED;
-                }
-            });
-
-            // Submit the topology and hereby start the event streams.
-            dp.submit(top);
-        }
-    }
-
-    private static void updateOutput() {
-        System.out.println(String.format("Small Boxes: %3d, Large Boxes %3d", 
smallBoxes, largeBoxes));
-    }
-
-}
diff --git a/examples/iot-factory/src/main/resources/log4j2.xml 
b/examples/iot-factory/src/main/resources/log4j2.xml
new file mode 100644
index 0000000..d82c7c3
--- /dev/null
+++ b/examples/iot-factory/src/main/resources/log4j2.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<Configuration package="log4j.test"
+               status="WARN">
+  <Appenders>
+    <Console name="Console" target="SYSTEM_OUT">
+      <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - 
%msg%n"/>
+    </Console>
+  </Appenders>
+  <Loggers>
+    <Logger name="io.netty.util.ResourceLeakDetector" level="fatal">
+      <AppenderRef ref="Console"/>
+    </Logger>
+    <Root level="info">
+      <AppenderRef ref="Console"/>
+    </Root>
+  </Loggers>
+</Configuration>
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
cd...@apache.org.

Reply via email to