Repository: incubator-edgent
Updated Branches:
  refs/heads/master a2a29a9d7 -> b1b433716


[Edgent-375]  Add IotpDevice.httpEvents(...)

Project: http://git-wip-us.apache.org/repos/asf/incubator-edgent/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-edgent/commit/ede60042
Tree: http://git-wip-us.apache.org/repos/asf/incubator-edgent/tree/ede60042
Diff: http://git-wip-us.apache.org/repos/asf/incubator-edgent/diff/ede60042

Branch: refs/heads/master
Commit: ede6004229886ff1ddf40db449076acb5794657a
Parents: 261c7ee
Author: Dale LaBossiere <dlab...@us.ibm.com>
Authored: Mon Feb 20 17:59:26 2017 -0500
Committer: Dale LaBossiere <dlab...@us.ibm.com>
Committed: Mon Feb 20 17:59:26 2017 -0500

----------------------------------------------------------------------
 .../edgent/connectors/iotp/IotpDevice.java      | 39 +++++++++++++++
 .../connectors/iotp/runtime/IotpConnector.java  | 23 ++++++++-
 .../iotp/runtime/IotpDeviceHttpEventsFixed.java | 44 +++++++++++++++++
 .../runtime/IotpDeviceHttpEventsFunction.java   | 51 ++++++++++++++++++++
 .../connectors/iotp/IotpQuickstart2.java        | 21 +++++---
 5 files changed, 171 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java
----------------------------------------------------------------------
diff --git 
a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java
 
b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java
index 007d883..f3a61fe 100644
--- 
a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java
+++ 
b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/IotpDevice.java
@@ -31,6 +31,8 @@ import 
org.apache.edgent.connectors.iotp.runtime.IotpConnector;
 import org.apache.edgent.connectors.iotp.runtime.IotpDeviceCommands;
 import org.apache.edgent.connectors.iotp.runtime.IotpDeviceEventsFixed;
 import org.apache.edgent.connectors.iotp.runtime.IotpDeviceEventsFunction;
+import org.apache.edgent.connectors.iotp.runtime.IotpDeviceHttpEventsFixed;
+import org.apache.edgent.connectors.iotp.runtime.IotpDeviceHttpEventsFunction;
 import org.apache.edgent.function.Function;
 import org.apache.edgent.function.UnaryOperator;
 import org.apache.edgent.topology.TSink;
@@ -252,6 +254,43 @@ public class IotpDevice implements IotDevice {
     }
 
     /**
+     * Publish a stream's tuples as device events using the WIoTP HTTP 
protocol.
+     * <p>
+     * Each tuple is published as a device event with the supplied functions
+     * providing the event identifier and payload from the tuple.
+     * The event identifier and payload can be generated based upon the tuple.
+     * The event is published with the equivalent of {@link QoS#AT_MOST_ONCE}.
+     * 
+     * @param stream
+     *            Stream to be published.
+     * @param eventId
+     *            function to supply the event identifier.
+     * @param payload
+     *            function to supply the event's payload.
+     * @return TSink sink element representing termination of this stream.
+     */
+    public TSink<JsonObject> httpEvents(TStream<JsonObject> stream, 
Function<JsonObject, String> eventId,
+        UnaryOperator<JsonObject> payload) {
+      return stream.sink(new IotpDeviceHttpEventsFunction(connector, eventId, 
payload));
+    }
+    
+    /**
+     * Publish a stream's tuples as device events using the WIoTP HTTP 
protocol.
+     * <p>
+     * Each tuple is published as a device event with the fixed event 
identifier.
+     * The event is published with the equivalent of {@link QoS#AT_MOST_ONCE}.
+     * 
+     * @param stream
+     *            Stream to be published.
+     * @param eventId
+     *            Event identifier.
+     * @return TSink sink element representing termination of this stream.
+     */
+    public TSink<JsonObject> httpEvents(TStream<JsonObject> stream, String 
eventId) {
+      return stream.sink(new IotpDeviceHttpEventsFixed(connector, eventId));
+    }
+
+    /**
      * Create a stream of device commands as JSON objects.
      * Each command sent to the device matching {@code commands} will result 
in a tuple
      * on the stream. The JSON object has these keys:

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
----------------------------------------------------------------------
diff --git 
a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
 
b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
index af16e61..998222f 100644
--- 
a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
+++ 
b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpConnector.java
@@ -24,8 +24,11 @@ import java.io.Serializable;
 import java.util.Properties;
 
 import org.apache.edgent.function.Consumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.gson.JsonObject;
+import com.ibm.iotf.client.api.APIClient.ContentType;
 import com.ibm.iotf.client.device.Command;
 import com.ibm.iotf.client.device.DeviceClient;
 
@@ -34,6 +37,7 @@ import com.ibm.iotf.client.device.DeviceClient;
  */
 public class IotpConnector implements Serializable, AutoCloseable {
     private static final long serialVersionUID = 1L;
+    private static final Logger logger = 
LoggerFactory.getLogger(IotpConnector.class);
 
     private Properties options;
     private File optionsFile;
@@ -98,7 +102,24 @@ public class IotpConnector implements Serializable, 
AutoCloseable {
             throw new RuntimeException(e);
 
         }
-        client.publishEvent(eventId, event, qos);
+        boolean success = client.publishEvent(eventId, event, qos);
+        if (!success) {
+          // TODO log
+        }
+    }
+
+    void publishHttpEvent(String eventId, JsonObject event) {
+        try {
+            DeviceClient client = getClient();
+            client.api().publishDeviceEventOverHTTP(eventId, event, 
ContentType.json);
+        } catch (Exception e) {
+            // throw new RuntimeException(e);
+            // If the publish throws, a RuntimeException will cause
+            // everything to unwind and the app/topology can terminate.
+            // See the commentary/impl of MqttPublisher.accept().
+            // See EDGENT-382
+            logger.error("Unable to publish tuple for event " + eventId, e);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java
----------------------------------------------------------------------
diff --git 
a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java
 
b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java
new file mode 100644
index 0000000..ea640b9
--- /dev/null
+++ 
b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFixed.java
@@ -0,0 +1,44 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.iotp.runtime;
+
+import org.apache.edgent.function.Consumer;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Consumer that publishes stream tuples as IoTf device events.
+ *
+ */
+public class IotpDeviceHttpEventsFixed implements Consumer<JsonObject> {
+    private static final long serialVersionUID = 1L;
+    private final IotpConnector connector;
+    private final String eventId;
+
+    public IotpDeviceHttpEventsFixed(IotpConnector connector, String eventId) {
+        this.connector = connector;
+        this.eventId = eventId;
+    }
+
+    @Override
+    public void accept(JsonObject event) {
+        connector.publishHttpEvent(eventId, event);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java
----------------------------------------------------------------------
diff --git 
a/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java
 
b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java
new file mode 100644
index 0000000..5470bf3
--- /dev/null
+++ 
b/connectors/iotp/src/main/java/org/apache/edgent/connectors/iotp/runtime/IotpDeviceHttpEventsFunction.java
@@ -0,0 +1,51 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.iotp.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.UnaryOperator;
+
+import com.google.gson.JsonObject;
+
+/**
+ * Consumer that publishes stream tuples as IoTf device events with a fixed
+ * event identifier and qos.
+ *
+ */
+public class IotpDeviceHttpEventsFunction implements Consumer<JsonObject> {
+
+    private static final long serialVersionUID = 1L;
+    private final IotpConnector connector;
+    private final Function<JsonObject, String> eventId;
+    private UnaryOperator<JsonObject> payload;
+
+    public IotpDeviceHttpEventsFunction(IotpConnector connector, 
Function<JsonObject, String> eventId,
+            UnaryOperator<JsonObject> payload) {
+        this.connector = connector;
+        this.payload = payload;
+        this.eventId = eventId;
+    }
+
+    @Override
+    public void accept(JsonObject event) {
+        connector.publishHttpEvent(eventId.apply(event), payload.apply(event));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-edgent/blob/ede60042/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
----------------------------------------------------------------------
diff --git 
a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
 
b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
index eff81c2..9a41601 100644
--- 
a/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
+++ 
b/samples/connectors/src/main/java/org/apache/edgent/samples/connectors/iotp/IotpQuickstart2.java
@@ -18,11 +18,12 @@ under the License.
 */
 package org.apache.edgent.samples.connectors.iotp;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.edgent.connectors.iot.IotDevice;
 import org.apache.edgent.connectors.iot.QoS;
 import org.apache.edgent.connectors.iotp.IotpDevice;
 import org.apache.edgent.providers.direct.DirectProvider;
@@ -49,12 +50,14 @@ import com.ibm.iotf.devicemgmt.device.ManagedDevice;
  * as it received by the Quickstart service.
  * <P>
  * This sample demonstrates using the WIoTP API to initialize the IotpDevice
- * connector.
+ * connector as well as the ability to publish events using the WIoTP HTTP 
protocol.
  */
 public class IotpQuickstart2 {
 
     public static void main(String[] args) throws Exception {
-        boolean useDeviceClient = args.length > 0 && 
args[0].equals("useDeviceClient");
+        List<String> argList = Arrays.asList(args);
+        boolean useDeviceClient = argList.contains("useDeviceClient");
+        boolean useHttp = argList.contains("useHttp");
 
         DirectProvider tp = new DirectProvider();
         Topology topology = tp.newTopology("IotpQuickstart");
@@ -65,7 +68,7 @@ public class IotpQuickstart2 {
         options.setProperty("org", "quickstart");
         options.setProperty("type", IotpDevice.QUICKSTART_DEVICE_TYPE);
         options.setProperty("id", deviceId);
-        IotDevice device;
+        IotpDevice device;
         if (useDeviceClient) {
           System.out.println("Using WIoTP DeviceClient");
           device = new IotpDevice(topology, new DeviceClient(options));
@@ -99,8 +102,14 @@ public class IotpQuickstart2 {
             j.addProperty("objectTemp", v[2]);
             return j;
         });
-        
-        device.events(json, "sensors", QoS.FIRE_AND_FORGET);
+
+        if (!useHttp) {
+          device.events(json, "sensors", QoS.FIRE_AND_FORGET);
+        }
+        else {
+          System.out.println("Publishing events using HTTP");
+          device.httpEvents(json, "sensors");
+        }
 
         tp.submit(topology);
     }

Reply via email to