Repository: incubator-edgent Updated Branches: refs/heads/master a2a29a9d7 -> b1b433716
[Edgent-375] Add IotpDevice.httpEvents(...) Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/ede60042 Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/ede60042 Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/ede60042 Branch: refs/heads/master Commit: ede6004229886ff1ddf40db449076acb5794657a Parents: 261c7ee Author: Dale LaBossiere <dlab...@us.ibm.com> Authored: Mon Feb 20 17:59:26 2017 -0500 Committer: Dale LaBossiere <dlab...@us.ibm.com> Committed: Mon Feb 20 17:59:26 2017 -0500 ---------------------------------------------------------------------- .../edgent/connectors/iotp/IotpDevice.java | 39 +++++++++++++++ .../connectors/iotp/runtime/IotpConnector.java | 23 ++++++++- .../iotp/runtime/IotpDeviceHttpEventsFixed.java | 44 +++++++++++++++++ .../runtime/IotpDeviceHttpEventsFunction.java | 51 ++++++++++++++++++++ .../connectors/iotp/IotpQuickstart2.java | 21 +++++--- 5 files changed, 171 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java index 007d883..f3a61fe 100644 --- a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java @@ -31,6 +31,8 @@ import org.apache.edgent.connectors.iotp.runtime.IotpConnector; import org.apache.edgent.connectors.iotp.runtime.IotpDeviceCommands; import org.apache.edgent.connectors.iotp.runtime.IotpDeviceEventsFixed; import org.apache.edgent.connectors.iotp.runtime.IotpDeviceEventsFunction; +import org.apache.edgent.connectors.iotp.runtime.IotpDeviceHttpEventsFixed; +import org.apache.edgent.connectors.iotp.runtime.IotpDeviceHttpEventsFunction; import org.apache.edgent.function.Function; import org.apache.edgent.function.UnaryOperator; import org.apache.edgent.topology.TSink; @@ -252,6 +254,43 @@ public class IotpDevice implements IotDevice { } /** + * Publish a stream's tuples as device events using the WIoTP HTTP protocol. + * <p> + * Each tuple is published as a device event with the supplied functions + * providing the event identifier and payload from the tuple. + * The event identifier and payload can be generated based upon the tuple. + * The event is published with the equivalent of {@link QoS#AT_MOST_ONCE}. + * + * @param stream + * Stream to be published. + * @param eventId + * function to supply the event identifier. + * @param payload + * function to supply the event's payload. + * @return TSink sink element representing termination of this stream. + */ + public TSink<JsonObject> httpEvents(TStream<JsonObject> stream, Function<JsonObject, String> eventId, + UnaryOperator<JsonObject> payload) { + return stream.sink(new IotpDeviceHttpEventsFunction(connector, eventId, payload)); + } + + /** + * Publish a stream's tuples as device events using the WIoTP HTTP protocol. + * <p> + * Each tuple is published as a device event with the fixed event identifier. + * The event is published with the equivalent of {@link QoS#AT_MOST_ONCE}. + * + * @param stream + * Stream to be published. + * @param eventId + * Event identifier. + * @return TSink sink element representing termination of this stream. + */ + public TSink<JsonObject> httpEvents(TStream<JsonObject> stream, String eventId) { + return stream.sink(new IotpDeviceHttpEventsFixed(connector, eventId)); + } + + /** * Create a stream of device commands as JSON objects. * Each command sent to the device matching {@code commands} will result in a tuple * on the stream. The JSON object has these keys: http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java index af16e61..998222f 100644 --- a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java @@ -24,8 +24,11 @@ import java.io.Serializable; import java.util.Properties; import org.apache.edgent.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.gson.JsonObject; +import com.ibm.iotf.client.api.APIClient.ContentType; import com.ibm.iotf.client.device.Command; import com.ibm.iotf.client.device.DeviceClient; @@ -34,6 +37,7 @@ import com.ibm.iotf.client.device.DeviceClient; */ public class IotpConnector implements Serializable, AutoCloseable { private static final long serialVersionUID = 1L; + private static final Logger logger = LoggerFactory.getLogger(IotpConnector.class); private Properties options; private File optionsFile; @@ -98,7 +102,24 @@ public class IotpConnector implements Serializable, AutoCloseable { throw new RuntimeException(e); } - client.publishEvent(eventId, event, qos); + boolean success = client.publishEvent(eventId, event, qos); + if (!success) { + // TODO log + } + } + + void publishHttpEvent(String eventId, JsonObject event) { + try { + DeviceClient client = getClient(); + client.api().publishDeviceEventOverHTTP(eventId, event, ContentType.json); + } catch (Exception e) { + // throw new RuntimeException(e); + // If the publish throws, a RuntimeException will cause + // everything to unwind and the app/topology can terminate. + // See the commentary/impl of MqttPublisher.accept(). + // See EDGENT-382 + logger.error("Unable to publish tuple for event " + eventId, e); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java new file mode 100644 index 0000000..ea640b9 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java @@ -0,0 +1,44 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.iotp.runtime; + +import org.apache.edgent.function.Consumer; + +import com.google.gson.JsonObject; + +/** + * Consumer that publishes stream tuples as IoTf device events. + * + */ +public class IotpDeviceHttpEventsFixed implements Consumer<JsonObject> { + private static final long serialVersionUID = 1L; + private final IotpConnector connector; + private final String eventId; + + public IotpDeviceHttpEventsFixed(IotpConnector connector, String eventId) { + this.connector = connector; + this.eventId = eventId; + } + + @Override + public void accept(JsonObject event) { + connector.publishHttpEvent(eventId, event); + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java ---------------------------------------------------------------------- diff --git a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java new file mode 100644 index 0000000..5470bf3 --- /dev/null +++ b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java @@ -0,0 +1,51 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +*/ + +package org.apache.edgent.connectors.iotp.runtime; + +import org.apache.edgent.function.Consumer; +import org.apache.edgent.function.Function; +import org.apache.edgent.function.UnaryOperator; + +import com.google.gson.JsonObject; + +/** + * Consumer that publishes stream tuples as IoTf device events with a fixed + * event identifier and qos. + * + */ +public class IotpDeviceHttpEventsFunction implements Consumer<JsonObject> { + + private static final long serialVersionUID = 1L; + private final IotpConnector connector; + private final Function<JsonObject, String> eventId; + private UnaryOperator<JsonObject> payload; + + public IotpDeviceHttpEventsFunction(IotpConnector connector, Function<JsonObject, String> eventId, + UnaryOperator<JsonObject> payload) { + this.connector = connector; + this.payload = payload; + this.eventId = eventId; + } + + @Override + public void accept(JsonObject event) { + connector.publishHttpEvent(eventId.apply(event), payload.apply(event)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java ---------------------------------------------------------------------- diff --git a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java index eff81c2..9a41601 100644 --- a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java +++ b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java @@ -18,11 +18,12 @@ under the License. */ package org.apache.edgent.samples.connectors.iotp; +import java.util.Arrays; +import java.util.List; import java.util.Properties; import java.util.Random; import java.util.concurrent.TimeUnit; -import org.apache.edgent.connectors.iot.IotDevice; import org.apache.edgent.connectors.iot.QoS; import org.apache.edgent.connectors.iotp.IotpDevice; import org.apache.edgent.providers.direct.DirectProvider; @@ -49,12 +50,14 @@ import com.ibm.iotf.devicemgmt.device.ManagedDevice; * as it received by the Quickstart service. * <P> * This sample demonstrates using the WIoTP API to initialize the IotpDevice - * connector. + * connector as well as the ability to publish events using the WIoTP HTTP protocol. */ public class IotpQuickstart2 { public static void main(String[] args) throws Exception { - boolean useDeviceClient = args.length > 0 && args[0].equals("useDeviceClient"); + List<String> argList = Arrays.asList(args); + boolean useDeviceClient = argList.contains("useDeviceClient"); + boolean useHttp = argList.contains("useHttp"); DirectProvider tp = new DirectProvider(); Topology topology = tp.newTopology("IotpQuickstart"); @@ -65,7 +68,7 @@ public class IotpQuickstart2 { options.setProperty("org", "quickstart"); options.setProperty("type", IotpDevice.QUICKSTART_DEVICE_TYPE); options.setProperty("id", deviceId); - IotDevice device; + IotpDevice device; if (useDeviceClient) { System.out.println("Using WIoTP DeviceClient"); device = new IotpDevice(topology, new DeviceClient(options)); @@ -99,8 +102,14 @@ public class IotpQuickstart2 { j.addProperty("objectTemp", v[2]); return j; }); - - device.events(json, "sensors", QoS.FIRE_AND_FORGET); + + if (!useHttp) { + device.events(json, "sensors", QoS.FIRE_AND_FORGET); + } + else { + System.out.println("Publishing events using HTTP"); + device.httpEvents(json, "sensors"); + } tp.submit(topology); }