[ 
https://issues.apache.org/jira/browse/NIFI-1767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310854#comment-15310854
 ] 

ASF GitHub Bot commented on NIFI-1767:
--------------------------------------

Github user JPercivall commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/349#discussion_r65418441
  
    --- Diff: 
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java
 ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.aws.iot;
    +
    +import com.amazonaws.ClientConfiguration;
    +import com.amazonaws.auth.AWSCredentials;
    +import com.amazonaws.auth.AWSCredentialsProvider;
    +import com.amazonaws.services.iot.AWSIotClient;
    +import org.apache.commons.lang3.RandomStringUtils;
    +import org.apache.nifi.annotation.lifecycle.OnStopped;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import 
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
    +import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
    +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +
    +import java.util.Date;
    +import java.util.concurrent.TimeUnit;
    +
    +public abstract class AbstractAWSIoTProcessor extends 
AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
    +    static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
    +    static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
    +    static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
    +    static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
    +    static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
    +    /**
    +     * Amazon's current service limit on websocket connection duration
    +     */
    +    static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
    +    /**
    +     * When to start indicating the need for connection renewal (in 
seconds before actual termination)
    +     */
    +    static final Integer 
DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20;
    +    static final String PROP_DEFAULT_CLIENT = 
AbstractAWSIoTProcessor.class.getSimpleName();
    +    /**
    +     * Default QoS level for message delivery
    +     */
    +    static final Integer DEFAULT_QOS = 0;
    +    String awsTopic;
    +    int awsQos;
    +    MqttWebSocketAsyncClient mqttClient;
    +    String awsEndpoint;
    +    String awsClientId;
    +
    +    private String awsRegion;
    +    private Integer awsKeepAliveSeconds;
    +    private Date dtLastConnect;
    +
    +    public static final PropertyDescriptor PROP_ENDPOINT = new 
PropertyDescriptor
    +            .Builder().name(PROP_NAME_ENDPOINT)
    +            .description("Your endpoint identifier in AWS IoT (e.g. 
A1B71MLXKNCXXX)")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_CLIENT = new 
PropertyDescriptor
    +            .Builder().name(PROP_NAME_CLIENT)
    +            .description("MQTT client ID to use. Under the cover your 
input will be extended by a random " +
    +                    "string to ensure a unique id among all conntected 
clients.")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_CLIENT)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_KEEPALIVE = new 
PropertyDescriptor
    +            .Builder().name(PROP_NAME_KEEPALIVE)
    +            .description("Seconds a WebSocket-connection remains open 
after automatically renewing it. " +
    +                    "This is neccessary due to Amazon's service limit on 
WebSocket connection duration. " +
    +                    "As soon as the limit is changed by Amazon you can 
adjust the value here. Never use " +
    +                    "a duration longer than supported by Amazon. This 
processor renews the connection " +
    +                    "" + 
DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION + " seconds before the 
" +
    +                    "actual expiration. If no value set the default will 
be " + PROP_DEFAULT_KEEPALIVE + ".")
    +            .required(false)
    +            .defaultValue(PROP_DEFAULT_KEEPALIVE.toString())
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_TOPIC = new 
PropertyDescriptor
    +            .Builder().name(PROP_NAME_TOPIC)
    +            .description("MQTT topic to work with. (pattern: 
$aws/things/mything/shadow/update).")
    +            .required(true)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    public static final PropertyDescriptor PROP_QOS = new 
PropertyDescriptor
    +            .Builder().name(PROP_NAME_QOS)
    +            .description("Decide for at most once (0) or at least once (1) 
message-receiption. " +
    +                    "Currently AWS IoT does not support QoS-level 2. If no 
value set the default QoS " +
    +                    "is " + DEFAULT_QOS + ".")
    +            .required(false)
    +            .allowableValues("0", "1")
    +            .defaultValue("0")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .build();
    +
    +    /**
    +     * Create client using credentials provider. This is the preferred way 
for creating clients
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, 
final AWSCredentialsProvider credentialsProvider, final ClientConfiguration 
config) {
    +        getLogger().info("Creating client using aws credentials provider 
");
    +        // Actually this client is not needed. However, it is initialized 
due to the pattern of
    +        // AbstractAWSCredentialsProviderProcessor
    +        return new AWSIotClient(credentialsProvider, config);
    +    }
    +
    +    /**
    +     * Create client using AWSCredentails
    +     *
    +     * @deprecated use {@link #createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)} instead
    +     */
    +    @Override
    +    protected AWSIotClient createClient(final ProcessContext context, 
final AWSCredentials credentials, final ClientConfiguration config) {
    +        getLogger().info("Creating client using aws credentials ");
    +        // Actually this client is not needed. it is initialized due to 
the pattern of
    +        // AbstractAWSProcessor
    +        return new AWSIotClient(credentials, config);
    +    }
    +
    +    /**
    +     * Gets ready an MQTT client by connecting to a AWS IoT WebSocket 
endpoint specific to the properties
    +     * @param context processor context
    +     */
    +    void init(final ProcessContext context) {
    +        // read out properties
    +        awsEndpoint = context.getProperty(PROP_ENDPOINT).getValue();
    +        awsRegion = context.getProperty(REGION).getValue();
    +        awsClientId = context.getProperty(PROP_CLIENT).isSet() ? 
context.getProperty(PROP_CLIENT).getValue() : PROP_DEFAULT_CLIENT;
    +        awsKeepAliveSeconds = context.getProperty(PROP_KEEPALIVE).isSet() 
? context.getProperty(PROP_KEEPALIVE).asInteger() : PROP_DEFAULT_KEEPALIVE;
    +        awsTopic = context.getProperty(PROP_TOPIC).getValue();
    +        awsQos = context.getProperty(PROP_QOS).isSet() ? 
context.getProperty(PROP_QOS).asInteger() : DEFAULT_QOS;
    +        // initialize and connect to mqtt endpoint
    +        mqttClient = connect(context);
    +    }
    +
    +    @OnStopped
    +    public void onStopped(final ProcessContext context) {
    +        try {
    +            mqttClient.disconnect();
    +        } catch (MqttException me) {
    +            getLogger().warn("MQTT " + me.getMessage());
    +        }
    +        getLogger().info("Disconnected");
    +    }
    +
    +    /**
    +     * Returns the lifetime-seconds of the established websocket-connection
    +     * @return seconds
    +     */
    +    long getConnectionDuration() {
    +        return dtLastConnect != null
    +                ? TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - 
dtLastConnect.getTime())
    +                : awsKeepAliveSeconds + 1;
    +    }
    +
    +    /**
    +     * In seconds get the remaining lifetime of the connection. It is not 
the actual time to
    +     * expiration but an advice to when it is worth renewing the 
connection.
    +     * @return seconds
    +     */
    +    long getRemainingConnectionLifetime() {
    +        return awsKeepAliveSeconds - 
DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION;
    +    }
    +
    +    /**
    +     * Indicates if WebSocket connection is about to expire. It gives the 
caller an advice
    +     * to renew the connection some time before the actual expiration.
    +     * @return Indication (if true caller should renew the connection)
    +     */
    +    boolean isConnectionAboutToExpire() {
    +        return getConnectionDuration() > getRemainingConnectionLifetime();
    +    }
    +
    +    /**
    +     * Connects to the websocket-endpoint over an MQTT client.
    +     * @param context processcontext
    +     * @return websocket connection client
    +     */
    +    MqttWebSocketAsyncClient connect(ProcessContext context) {
    --- End diff --
    
    The beginning portion of this method re-creates multiple different local 
variables to use in reconnecting. Maybe persist anything can be re-used (like 
the connections options).


> AWS IoT processors
> ------------------
>
>                 Key: NIFI-1767
>                 URL: https://issues.apache.org/jira/browse/NIFI-1767
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Kay Lerch
>         Attachments: 20160413_apache-nifi-aws-iot-pull-request_lerchkay.pdf
>
>
> Four new processors to communicate with Amazon’s managed device gateway 
> service AWS IoT.
> h5.Use cases
> * Consume reported states from a fleet of things managed and secured on 
> Amazon’s gateway service
> * Propagate desired states to a fleet of things managed and secured on 
> Amazon’s gateway service
> * Intercept M2M communication
> * Hybrid IoT solutions: brings together a managed device gateway in the cloud 
> and onpremise data-consumers and -providers.
> h4.GetIOTMqtt:
> Opens up a connection to an AWS-account-specific websocket endpoint in order 
> to subscribe to any of the MQTT topics belonging to a registered thing in AWS 
> IoT.
> h4.PutIOTMqtt
> Opens up a connection to an AWS-account-specific websocket endpoint in order 
> to publish messages to any of the MQTT topics belonging to a registered thing 
> in AWS IoT.
> h4.GetIOTShadow
> In AWS IoT a physical thing is represented with its last reported state by 
> the so-called thing shadow. This processor reads out the current state of a 
> shadow (persisted as JSON) by requesting the managed API of AWS IoT.
> h4.PutIOTShadow
> In AWS IoT a physical thing is represented with its last reported state by 
> the so-called thing shadow. This processor updates the current state of a 
> shadow (persisted as JSON) by requesting the managed API of AWS IoT. An 
> update to a shadow lets AWS IoT propagate changes to the MQTT topics of the 
> thing.
> h5.Known issues:
> * It was hard for me to write appropriate integration tests since the MQTT 
> processors work with durable websocket-connections which are kind of tough to 
> test. With your help I would love to do a better job on testing and hand it 
> in later on. All of the processors were tested in a live-scenario which ran 
> over a longer period of time. Didn’t observe any issue.
> * I got rid of all the properties for the deprecated 
> AWSCredentialProviderService and only made use of 
> AWSCredentialsProviderControllerService. If both are still necessary for 
> backward-compatibilities sake I would add the deprecated feature.
> Refers to Pull Request 349: https://github.com/apache/nifi/pull/349



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to