[ https://issues.apache.org/jira/browse/NIFI-1767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15310854#comment-15310854 ]
ASF GitHub Bot commented on NIFI-1767: -------------------------------------- Github user JPercivall commented on a diff in the pull request: https://github.com/apache/nifi/pull/349#discussion_r65418441 --- Diff: nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.aws.iot; + +import com.amazonaws.ClientConfiguration; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.services.iot.AWSIotClient; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor; +import org.apache.nifi.processors.aws.iot.util.AWS4Signer; +import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; + +import java.util.Date; +import java.util.concurrent.TimeUnit; + +public abstract class AbstractAWSIoTProcessor extends AbstractAWSCredentialsProviderProcessor<AWSIotClient> { + static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint"; + static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client"; + static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive"; + static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic"; + static final String PROP_NAME_QOS = "aws.iot.mqtt.qos"; + /** + * Amazon's current service limit on websocket connection duration + */ + static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24; + /** + * When to start indicating the need for connection renewal (in seconds before actual termination) + */ + static final Integer DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20; + static final String PROP_DEFAULT_CLIENT = AbstractAWSIoTProcessor.class.getSimpleName(); + /** + * Default QoS level for message delivery + */ + static final Integer DEFAULT_QOS = 0; + String awsTopic; + int awsQos; + MqttWebSocketAsyncClient mqttClient; + String awsEndpoint; + String awsClientId; + + private String awsRegion; + private Integer awsKeepAliveSeconds; + private Date dtLastConnect; + + public static final PropertyDescriptor PROP_ENDPOINT = new PropertyDescriptor + .Builder().name(PROP_NAME_ENDPOINT) + .description("Your endpoint identifier in AWS IoT (e.g. A1B71MLXKNCXXX)") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_CLIENT = new PropertyDescriptor + .Builder().name(PROP_NAME_CLIENT) + .description("MQTT client ID to use. Under the cover your input will be extended by a random " + + "string to ensure a unique id among all conntected clients.") + .required(false) + .defaultValue(PROP_DEFAULT_CLIENT) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_KEEPALIVE = new PropertyDescriptor + .Builder().name(PROP_NAME_KEEPALIVE) + .description("Seconds a WebSocket-connection remains open after automatically renewing it. " + + "This is neccessary due to Amazon's service limit on WebSocket connection duration. " + + "As soon as the limit is changed by Amazon you can adjust the value here. Never use " + + "a duration longer than supported by Amazon. This processor renews the connection " + + "" + DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION + " seconds before the " + + "actual expiration. If no value set the default will be " + PROP_DEFAULT_KEEPALIVE + ".") + .required(false) + .defaultValue(PROP_DEFAULT_KEEPALIVE.toString()) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor + .Builder().name(PROP_NAME_TOPIC) + .description("MQTT topic to work with. (pattern: $aws/things/mything/shadow/update).") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor PROP_QOS = new PropertyDescriptor + .Builder().name(PROP_NAME_QOS) + .description("Decide for at most once (0) or at least once (1) message-receiption. " + + "Currently AWS IoT does not support QoS-level 2. If no value set the default QoS " + + "is " + DEFAULT_QOS + ".") + .required(false) + .allowableValues("0", "1") + .defaultValue("0") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + /** + * Create client using credentials provider. This is the preferred way for creating clients + */ + @Override + protected AWSIotClient createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials provider "); + // Actually this client is not needed. However, it is initialized due to the pattern of + // AbstractAWSCredentialsProviderProcessor + return new AWSIotClient(credentialsProvider, config); + } + + /** + * Create client using AWSCredentails + * + * @deprecated use {@link #createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)} instead + */ + @Override + protected AWSIotClient createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config) { + getLogger().info("Creating client using aws credentials "); + // Actually this client is not needed. it is initialized due to the pattern of + // AbstractAWSProcessor + return new AWSIotClient(credentials, config); + } + + /** + * Gets ready an MQTT client by connecting to a AWS IoT WebSocket endpoint specific to the properties + * @param context processor context + */ + void init(final ProcessContext context) { + // read out properties + awsEndpoint = context.getProperty(PROP_ENDPOINT).getValue(); + awsRegion = context.getProperty(REGION).getValue(); + awsClientId = context.getProperty(PROP_CLIENT).isSet() ? context.getProperty(PROP_CLIENT).getValue() : PROP_DEFAULT_CLIENT; + awsKeepAliveSeconds = context.getProperty(PROP_KEEPALIVE).isSet() ? context.getProperty(PROP_KEEPALIVE).asInteger() : PROP_DEFAULT_KEEPALIVE; + awsTopic = context.getProperty(PROP_TOPIC).getValue(); + awsQos = context.getProperty(PROP_QOS).isSet() ? context.getProperty(PROP_QOS).asInteger() : DEFAULT_QOS; + // initialize and connect to mqtt endpoint + mqttClient = connect(context); + } + + @OnStopped + public void onStopped(final ProcessContext context) { + try { + mqttClient.disconnect(); + } catch (MqttException me) { + getLogger().warn("MQTT " + me.getMessage()); + } + getLogger().info("Disconnected"); + } + + /** + * Returns the lifetime-seconds of the established websocket-connection + * @return seconds + */ + long getConnectionDuration() { + return dtLastConnect != null + ? TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() - dtLastConnect.getTime()) + : awsKeepAliveSeconds + 1; + } + + /** + * In seconds get the remaining lifetime of the connection. It is not the actual time to + * expiration but an advice to when it is worth renewing the connection. + * @return seconds + */ + long getRemainingConnectionLifetime() { + return awsKeepAliveSeconds - DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION; + } + + /** + * Indicates if WebSocket connection is about to expire. It gives the caller an advice + * to renew the connection some time before the actual expiration. + * @return Indication (if true caller should renew the connection) + */ + boolean isConnectionAboutToExpire() { + return getConnectionDuration() > getRemainingConnectionLifetime(); + } + + /** + * Connects to the websocket-endpoint over an MQTT client. + * @param context processcontext + * @return websocket connection client + */ + MqttWebSocketAsyncClient connect(ProcessContext context) { --- End diff -- The beginning portion of this method re-creates multiple different local variables to use in reconnecting. Maybe persist anything can be re-used (like the connections options). > AWS IoT processors > ------------------ > > Key: NIFI-1767 > URL: https://issues.apache.org/jira/browse/NIFI-1767 > Project: Apache NiFi > Issue Type: New Feature > Components: Extensions > Reporter: Kay Lerch > Attachments: 20160413_apache-nifi-aws-iot-pull-request_lerchkay.pdf > > > Four new processors to communicate with Amazon’s managed device gateway > service AWS IoT. > h5.Use cases > * Consume reported states from a fleet of things managed and secured on > Amazon’s gateway service > * Propagate desired states to a fleet of things managed and secured on > Amazon’s gateway service > * Intercept M2M communication > * Hybrid IoT solutions: brings together a managed device gateway in the cloud > and onpremise data-consumers and -providers. > h4.GetIOTMqtt: > Opens up a connection to an AWS-account-specific websocket endpoint in order > to subscribe to any of the MQTT topics belonging to a registered thing in AWS > IoT. > h4.PutIOTMqtt > Opens up a connection to an AWS-account-specific websocket endpoint in order > to publish messages to any of the MQTT topics belonging to a registered thing > in AWS IoT. > h4.GetIOTShadow > In AWS IoT a physical thing is represented with its last reported state by > the so-called thing shadow. This processor reads out the current state of a > shadow (persisted as JSON) by requesting the managed API of AWS IoT. > h4.PutIOTShadow > In AWS IoT a physical thing is represented with its last reported state by > the so-called thing shadow. This processor updates the current state of a > shadow (persisted as JSON) by requesting the managed API of AWS IoT. An > update to a shadow lets AWS IoT propagate changes to the MQTT topics of the > thing. > h5.Known issues: > * It was hard for me to write appropriate integration tests since the MQTT > processors work with durable websocket-connections which are kind of tough to > test. With your help I would love to do a better job on testing and hand it > in later on. All of the processors were tested in a live-scenario which ran > over a longer period of time. Didn’t observe any issue. > * I got rid of all the properties for the deprecated > AWSCredentialProviderService and only made use of > AWSCredentialsProviderControllerService. If both are still necessary for > backward-compatibilities sake I would add the deprecated feature. > Refers to Pull Request 349: https://github.com/apache/nifi/pull/349 -- This message was sent by Atlassian JIRA (v6.3.4#6332)