This is an automated email from the ASF dual-hosted git repository. cdutz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-plc4x.git
commit 851838e455cfe2fc59eae2e490428596a836a48e Author: Christofer Dutz <christofer.d...@c-ware.de> AuthorDate: Tue May 22 17:40:49 2018 +0200 Updated the iot-factory example to use Elasticsearch to save data which can then be displayed with Kibana (for example) (cherry picked from commit 93ae433) --- examples/iot-factory/README.adoc | 66 ++++++ examples/iot-factory/pom.xml | 14 +- .../iotfactory/IotElasticsearchFactory.java | 224 +++++++++++++++++++++ .../plc4x/java/examples/iotfactory/IotFactory.java | 86 -------- examples/iot-factory/src/main/resources/log4j2.xml | 35 ++++ 5 files changed, 338 insertions(+), 87 deletions(-) diff --git a/examples/iot-factory/README.adoc b/examples/iot-factory/README.adoc new file mode 100644 index 0000000..869730f --- /dev/null +++ b/examples/iot-factory/README.adoc @@ -0,0 +1,66 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +== IoT Factory example + +In this example we will be running the full stack of tools for a rudimentary factory dashboard. + +This example however is everything but production ready. +First of all we use create an Elasticsearch single node cluster embedded within the applications VM. +Secondly we communicate with the `cluster` using the NodeClient, which is super-easy to use, but shouldn't be used in production type environments. + +We decided to go this path because this way it's super-easy to setup everything and running the example is nothing but starting a Java application. + +=== What else do I need? + +In General: +- A PLC +- A Factory + +Even if you might be able to get your hands on a PLC I doubt you have a spare factory available. + +This example is built to visualize a virtual factory simulated with https://factoryio.com/[Factory I/O]. + +We have decided to stick with one of the on-board examples, to make it as easy as possible for you to reproduce. +The example we used is: "Sorting by Height". + +Here's a video demonstrating this: +https://www.youtube.com/watch?v=B0n8gT1vto4 + +You can run this with a real PLC or use a PLC simulator. +In our IoT lab we are currently running this with the real S7-1200. + +=== What happens in detail? + +When the application is started it first starts up an Elasticsearch node. + +After that is up and running, it checks if the indexes we are going to write to exist. + +If they don't and we would just have ES create them on the fly, we wouldn't be able to use the time-data in the time fields as they would be treated as numbers. + +Any non-existent indexes are created. + +After that an `Apache Edgent` instance is created and one stream is created to fetch the state of all 8 outputs of the PLC every 100ms. + +This stream is then split up. + +One branch simply dumps the data into the `factory-data` index. + +The second branch is a little more complex as it monitors the state of the left and right conveyors to extrapolate if a new `big` or `small box is being transported. + +Only when a new box is detected an entry is written into the `product-data` index. + diff --git a/examples/iot-factory/pom.xml b/examples/iot-factory/pom.xml index 900c83d..fcda208 100644 --- a/examples/iot-factory/pom.xml +++ b/examples/iot-factory/pom.xml @@ -33,7 +33,7 @@ <description>Client application demonstrating PLC4X in our virtual IoT Factory.</description> <properties> - <app.main.class>org.apache.plc4x.java.examples.iotfactory.IotFactory</app.main.class> + <app.main.class>org.apache.plc4x.java.examples.iotfactory.IotElasticsearchFactory</app.main.class> </properties> <dependencies> @@ -59,6 +59,18 @@ <version>1.2.0</version> </dependency> + <!-- Elasticsearch dependencies --> + <dependency> + <groupId>org.elasticsearch</groupId> + <artifactId>elasticsearch</artifactId> + <version>6.2.4</version> + </dependency> + <dependency> + <groupId>org.elasticsearch.plugin</groupId> + <artifactId>transport-netty4-client</artifactId> + <version>6.2.4</version> + </dependency> + <!-- Required driver implementation --> <dependency> <groupId>org.apache.plc4x</groupId> diff --git a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java new file mode 100644 index 0000000..1a0c225 --- /dev/null +++ b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotElasticsearchFactory.java @@ -0,0 +1,224 @@ +package org.apache.plc4x.java.examples.iotfactory; +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +import org.apache.edgent.function.Supplier; +import org.apache.edgent.providers.direct.DirectProvider; +import org.apache.edgent.topology.TStream; +import org.apache.edgent.topology.Topology; +import org.apache.plc4x.edgent.PlcConnectionAdapter; +import org.apache.plc4x.edgent.PlcFunctions; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; +import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; +import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.node.InternalSettingsPreparer; +import org.elasticsearch.node.Node; +import org.elasticsearch.node.NodeValidationException; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty4Plugin; + +import java.io.IOException; +import java.util.Calendar; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +public class IotElasticsearchFactory { + + public enum ConveyorState { + STOPPED, + RUNNING_LEFT, + RUNNING_RIGHT + } + + private ConveyorState conveyorState = ConveyorState.STOPPED; + + private Node startElasticsearchNode() throws NodeValidationException { + Node node = new MyNode(Settings.builder() + .put("transport.type", "netty4") + .put("http.type", "netty4") + .put("http.enabled", "true") + .put("path.home", "elasticsearch-data") + .build(), Collections.singletonList(Netty4Plugin.class)); + node.start(); + return node; + } + + private void prepareIndexes(Client esClient) throws Exception { + IndicesAdminClient indicesAdminClient = esClient.admin().indices(); + + // Check if the factory-data index exists and create it, if it doesn't. + IndicesExistsRequest factoryDataIndexExistsRequest = + indicesAdminClient.prepareExists("product-data").request(); + if(!indicesAdminClient.exists(factoryDataIndexExistsRequest).actionGet().isExists()) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest("factory-data"); + createIndexRequest.mapping("FactoryData", + "{\n" + + " \"properties\": {\n" + + " \"time\": {\n" + + " \"type\": \"date\"\n" + + " }\n" + + " }\n" + + " }", XContentType.JSON); + CreateIndexResponse createIndexResponse = indicesAdminClient.create(createIndexRequest).actionGet(); + if(!createIndexResponse.isAcknowledged()) { + throw new Exception("Could not create index 'product-data'"); + } + } + + // Check if the product-data index exists and create it, if it doesn't. + IndicesExistsRequest productDataIndexExistsRequest = + indicesAdminClient.prepareExists("product-data").request(); + if(!indicesAdminClient.exists(productDataIndexExistsRequest).actionGet().isExists()) { + CreateIndexRequest createIndexRequest = new CreateIndexRequest("product-data"); + createIndexRequest.mapping("ProductData", + "{\n" + + " \"properties\": {\n" + + " \"time\": {\n" + + " \"type\": \"date\"\n" + + " },\n" + + " \"type\": {\n" + + " \"type\": \"keyword\"\n" + + " }\n" + + " }\n" + + " }", XContentType.JSON); + CreateIndexResponse createIndexResponse = indicesAdminClient.create(createIndexRequest).actionGet(); + if(!createIndexResponse.isAcknowledged()) { + throw new Exception("Could not create index 'product-data'"); + } + } + } + + private void runFactory() throws Exception { + // Start an Elasticsearch node. + Node esNode = startElasticsearchNode(); + Client esClient = esNode.client(); + System.out.println("Started Elasticsearch node on port 9200"); + + // Make sure the indexes exist prior to writing to them. + prepareIndexes(esClient); + + // Get a plc connection. + try (PlcConnectionAdapter plcAdapter = new PlcConnectionAdapter("s7://10.10.64.20/1/1")) { + // Initialize the Edgent core. + DirectProvider dp = new DirectProvider(); + Topology top = dp.newTopology(); + + // Define the event stream. + // 1) PLC4X source generating a stream of bytes. + Supplier<Byte> plcSupplier = PlcFunctions.byteSupplier(plcAdapter, "OUTPUTS/0"); + // 2) Use polling to get an item from the byte-stream in regular intervals. + TStream<Byte> plcOutputStates = top.poll(plcSupplier, 100, TimeUnit.MILLISECONDS); + + // 3a) Create a stream that pumps all data into a 'factory-data' index. + TStream<XContentBuilder> factoryData = plcOutputStates.map(value -> { + boolean conveyorEntry = (value & 1) != 0; + boolean load = (value & 2) != 0; + boolean unload = (value & 4) != 0; + boolean transferLeft = (value & 8) != 0; + boolean transferRight = (value & 16) != 0; + boolean conveyorLeft = (value & 32) != 0; + boolean conveyorRight = (value & 64) != 0; + + try { + return XContentFactory.jsonBuilder() + .startObject() + .field("time", Calendar.getInstance().getTimeInMillis()) + .field("conveyorEntry", conveyorEntry) + .field("load", load) + .field( "unload", unload) + .field( "transferLeft", transferLeft) + .field( "transferRight", transferRight) + .field( "conveyorLeft", conveyorLeft) + .field( "conveyorRight", conveyorRight) + .endObject(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + }); + TStream<IndexResponse> factoryDataResponses = factoryData.map( + value -> esClient.prepareIndex("factory-data", "FactoryData").setSource(value).get()); + factoryDataResponses.print(); + + // 3b) Create a stream that does some local analysis to detect big and small boxes and to only output + // something to the 'product-data' index, if a new item is detected. + TStream<XContentBuilder> productData = plcOutputStates.map(value -> { + boolean transferLeft = (value & 8) != 0; + boolean transferRight = (value & 16) != 0; + + if (conveyorState == ConveyorState.STOPPED) { + if (transferLeft | transferRight) { + if (transferLeft) { + conveyorState = ConveyorState.RUNNING_LEFT; + try { + return XContentFactory.jsonBuilder() + .startObject() + .field("time", Calendar.getInstance().getTimeInMillis()) + .field("type", "small") + .endObject(); + } catch (IOException e) { + e.printStackTrace(); + } + } else { + conveyorState = ConveyorState.RUNNING_RIGHT; + try { + return XContentFactory.jsonBuilder() + .startObject() + .field("time", Calendar.getInstance().getTimeInMillis()) + .field("type", "large") + .endObject(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + } else if (!(transferLeft | transferRight)) { + conveyorState = ConveyorState.STOPPED; + } + return null; + }); + TStream<IndexResponse> productDataResponses = productData.map( + value -> esClient.prepareIndex("product-data", "ProductData").setSource(value).get()); + productDataResponses.print(); + + // Submit the topology and hereby start the event streams. + dp.submit(top); + } + } + + public static void main(String[] args) throws Exception { + IotElasticsearchFactory factory = new IotElasticsearchFactory(); + factory.runFactory(); + } + + private static class MyNode extends Node { + private MyNode(Settings preparedSettings, Collection<Class<? extends Plugin>> classpathPlugins) { + super(InternalSettingsPreparer.prepareEnvironment(preparedSettings, null), classpathPlugins); + } + } + +} diff --git a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotFactory.java b/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotFactory.java deleted file mode 100644 index 91f6d26..0000000 --- a/examples/iot-factory/src/main/java/org/apache/plc4x/java/examples/iotfactory/IotFactory.java +++ /dev/null @@ -1,86 +0,0 @@ -package org.apache.plc4x.java.examples.iotfactory;/* -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, -software distributed under the License is distributed on an -"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -KIND, either express or implied. See the License for the -specific language governing permissions and limitations -under the License. -*/ - -import org.apache.edgent.function.Supplier; -import org.apache.edgent.providers.direct.DirectProvider; -import org.apache.edgent.topology.TStream; -import org.apache.edgent.topology.Topology; -import org.apache.plc4x.edgent.PlcConnectionAdapter; -import org.apache.plc4x.edgent.PlcFunctions; - -import java.util.concurrent.TimeUnit; - -public class IotFactory { - - public enum ConveyorState { - STOPPED, - RUNNING_LEFT, - RUNNING_RIGHT - }; - - private static int smallBoxes = 0; - private static int largeBoxes = 0; - private static ConveyorState conveyorState = ConveyorState.STOPPED; - - - public static void main(String[] args) throws Exception { - // Get a plc connection. - try (PlcConnectionAdapter plcAdapter = new PlcConnectionAdapter("s7://10.10.64.20/1/1")) { - // Initialize the Edgent core. - DirectProvider dp = new DirectProvider(); - Topology top = dp.newTopology(); - - // Define the event stream. - // 1) PLC4X source generating a stream of bytes. - Supplier<Byte> plcSupplier = PlcFunctions.byteSupplier(plcAdapter, "OUTPUTS/0"); - // 2) Use polling to get an item from the byte-stream in regular intervals. - TStream<Byte> source = top.poll(plcSupplier, 100, TimeUnit.MILLISECONDS); - // 3) Output the events in the stream on the console. - - source.sink(value -> { - boolean runningLeft = (value & 8) != 0; - boolean runningRight = (value & 16) != 0; - - if(conveyorState == ConveyorState.STOPPED) { - if(runningLeft | runningRight) { - if (runningLeft) { - smallBoxes++; - conveyorState = ConveyorState.RUNNING_LEFT; - updateOutput(); - } else { - largeBoxes++; - conveyorState = ConveyorState.RUNNING_RIGHT; - updateOutput(); - } - } - } else if (!(runningLeft | runningRight)){ - conveyorState = ConveyorState.STOPPED; - } - }); - - // Submit the topology and hereby start the event streams. - dp.submit(top); - } - } - - private static void updateOutput() { - System.out.println(String.format("Small Boxes: %3d, Large Boxes %3d", smallBoxes, largeBoxes)); - } - -} diff --git a/examples/iot-factory/src/main/resources/log4j2.xml b/examples/iot-factory/src/main/resources/log4j2.xml new file mode 100644 index 0000000..d82c7c3 --- /dev/null +++ b/examples/iot-factory/src/main/resources/log4j2.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<Configuration package="log4j.test" + status="WARN"> + <Appenders> + <Console name="Console" target="SYSTEM_OUT"> + <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"/> + </Console> + </Appenders> + <Loggers> + <Logger name="io.netty.util.ResourceLeakDetector" level="fatal"> + <AppenderRef ref="Console"/> + </Logger> + <Root level="info"> + <AppenderRef ref="Console"/> + </Root> + </Loggers> +</Configuration> \ No newline at end of file -- To stop receiving notification emails like this one, please contact cd...@apache.org.