[
https://issues.apache.org/jira/browse/NIFI-1767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15873680#comment-15873680
]
ASF GitHub Bot commented on NIFI-1767:
--------------------------------------
Github user trixpan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1521#discussion_r101916245
--- Diff:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/GetAWSIoTShadow.java
---
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.iot;
+
+import com.amazonaws.services.iotdata.AWSIotDataClient;
+import com.amazonaws.services.iotdata.model.GetThingShadowRequest;
+import com.amazonaws.services.iotdata.model.GetThingShadowResult;
+import org.apache.nifi.annotation.behavior.*;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.HashMap;
+import java.util.Map;
+
+@EventDriven
+@Tags({"Amazon", "AWS", "IOT", "Shadow", "Get"})
+@InputRequirement(InputRequirement.Requirement.INPUT_ALLOWED)
+@CapabilityDescription("Gets last persisted state of a thing in AWS IoT by
reading out the shadow. " +
+ "A shadow might change more often than you get triggered. In order
to get every message send " +
+ "out by a thing you better use ConsumeAWSIoTMqtt processor. You
can dynamically set a thing-name " +
+ "when overriding the processor-configuration with a
message-attribute \"aws.iot.thing.override\".")
+@SeeAlso({ ConsumeAWSIoTMqtt.class })
+@ReadsAttributes({
+ @ReadsAttribute(attribute = "aws.iot.thing.override", description
= "Overrides the processor configuration for topic."),
+})
+@WritesAttributes({
+ @WritesAttribute(attribute = "aws.iot.thing", description = "Thing
name in AWS IoT"),
+})
+public class GetAWSIoTShadow extends AbstractAWSIoTShadowProcessor {
+ public static final List<PropertyDescriptor> properties =
Collections.unmodifiableList(
+ Arrays.asList(
+ PROP_THING,
+ AWS_CREDENTIALS_PROVIDER_SERVICE,
+ TIMEOUT,
+ PROXY_HOST,
+ PROXY_HOST_PORT,
+ REGION));
+
+ private final static String ATTR_NAME_THING = PROP_NAME_THING +
".override";
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return Collections.singleton(REL_SUCCESS);
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) throws ProcessException {
+ final AWSIotDataClient iotClient = this.getClient();
+
+ if (iotClient == null) {
+ getLogger().error("AWS-Client was not initialized. See logs to
find reasons.");
+ return;
+ }
+ // get flowfile
+ FlowFile flowFile = session.get();
+ // if provided override configured thing name with name from
corresponding message attribute
+ String thingName = flowFile != null &&
flowFile.getAttributes().containsKey(ATTR_NAME_THING)
+ ? flowFile.getAttribute(ATTR_NAME_THING)
+ : context.getProperty(PROP_NAME_THING).getValue();
+
+ FlowFile flowFileOut = flowFile == null ? session.create() :
flowFile;
+
+ // ask shadow of the thing for last reported state by requesting
the API of AWS
+ final GetThingShadowRequest iotRequest = new
GetThingShadowRequest().withThingName(thingName);
+ final GetThingShadowResult iotResponse =
iotClient.getThingShadow(iotRequest);
+
+ //FlowFile flowFileOut = session.create();
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(PROP_NAME_THING, thingName);
+ flowFileOut = session.putAllAttributes(flowFileOut, attributes);
+
+ flowFileOut = session.write(flowFileOut, new
OutputStreamCallback() {
+ @Override
+ public void process(final OutputStream out) throws IOException
{
+ out.write(iotResponse.getPayload().array());
+ }
+ });
+ session.transfer(flowFileOut, REL_SUCCESS);
+ session.commit();
--- End diff --
**JPercivall**
Should emit a "FETCH" provenance event when created using an incoming
flowfile and "RECEIVE" when the processor is acting as a source.
> AWS IoT processors
> ------------------
>
> Key: NIFI-1767
> URL: https://issues.apache.org/jira/browse/NIFI-1767
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Kay Lerch
> Attachments: 20160413_apache-nifi-aws-iot-pull-request_lerchkay.pdf
>
>
> Four new processors to communicate with Amazon’s managed device gateway
> service AWS IoT.
> h5.Use cases
> * Consume reported states from a fleet of things managed and secured on
> Amazon’s gateway service
> * Propagate desired states to a fleet of things managed and secured on
> Amazon’s gateway service
> * Intercept M2M communication
> * Hybrid IoT solutions: brings together a managed device gateway in the cloud
> and onpremise data-consumers and -providers.
> h4.GetIOTMqtt:
> Opens up a connection to an AWS-account-specific websocket endpoint in order
> to subscribe to any of the MQTT topics belonging to a registered thing in AWS
> IoT.
> h4.PutIOTMqtt
> Opens up a connection to an AWS-account-specific websocket endpoint in order
> to publish messages to any of the MQTT topics belonging to a registered thing
> in AWS IoT.
> h4.GetIOTShadow
> In AWS IoT a physical thing is represented with its last reported state by
> the so-called thing shadow. This processor reads out the current state of a
> shadow (persisted as JSON) by requesting the managed API of AWS IoT.
> h4.PutIOTShadow
> In AWS IoT a physical thing is represented with its last reported state by
> the so-called thing shadow. This processor updates the current state of a
> shadow (persisted as JSON) by requesting the managed API of AWS IoT. An
> update to a shadow lets AWS IoT propagate changes to the MQTT topics of the
> thing.
> h5.Known issues:
> * It was hard for me to write appropriate integration tests since the MQTT
> processors work with durable websocket-connections which are kind of tough to
> test. With your help I would love to do a better job on testing and hand it
> in later on. All of the processors were tested in a live-scenario which ran
> over a longer period of time. Didn’t observe any issue.
> * I got rid of all the properties for the deprecated
> AWSCredentialProviderService and only made use of
> AWSCredentialsProviderControllerService. If both are still necessary for
> backward-compatibilities sake I would add the deprecated feature.
> Refers to Pull Request 349: https://github.com/apache/nifi/pull/349
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)