[
https://issues.apache.org/jira/browse/NIFI-1767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15873663#comment-15873663
]
ASF GitHub Bot commented on NIFI-1767:
--------------------------------------
Github user trixpan commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1521#discussion_r101915884
--- Diff:
nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/iot/AbstractAWSIoTProcessor.java
---
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.iot;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.iot.AWSIotClient;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import
org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import org.apache.nifi.processors.aws.iot.util.AWS4Signer;
+import org.apache.nifi.processors.aws.iot.util.MqttWebSocketAsyncClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+public abstract class AbstractAWSIoTProcessor extends
AbstractAWSCredentialsProviderProcessor<AWSIotClient> {
+ static final String PROP_NAME_ENDPOINT = "aws.iot.endpoint";
+ static final String PROP_NAME_CLIENT = "aws.iot.mqtt.client";
+ static final String PROP_NAME_KEEPALIVE = "aws.iot.mqtt.keepalive";
+ static final String PROP_NAME_TOPIC = "aws.iot.mqtt.topic";
+ static final String PROP_NAME_QOS = "aws.iot.mqtt.qos";
+ /**
+ * Amazon's current service limit on websocket connection duration
+ */
+ static final Integer PROP_DEFAULT_KEEPALIVE = 60 * 60 * 24;
+ /**
+ * When to start indicating the need for connection renewal (in
seconds before actual termination)
+ */
+ static final Integer
DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION = 20;
+ static final String PROP_DEFAULT_CLIENT =
AbstractAWSIoTProcessor.class.getSimpleName();
+ /**
+ * Default QoS level for message delivery
+ */
+ static final Integer DEFAULT_QOS = 0;
+ String awsTopic;
+ int awsQos;
+ MqttWebSocketAsyncClient mqttClient;
+ String awsEndpoint;
+ String awsClientId;
+
+ static final Integer mqttActionTimeout = -1;
+
+ private String awsRegion;
+ private Integer awsKeepAliveSeconds;
+ private Date dtLastConnect;
+
+ public static final PropertyDescriptor PROP_ENDPOINT = new
PropertyDescriptor
+ .Builder().name(PROP_NAME_ENDPOINT)
+ .description("Your endpoint identifier in AWS IoT (e.g.
A1B71MLXKNCXXX)")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_CLIENT = new
PropertyDescriptor
+ .Builder().name(PROP_NAME_CLIENT)
+ .description("MQTT client ID to use. Under the cover your
input will be extended by a random " +
+ "string to ensure a unique id among all connected
clients.")
+ .required(false)
+ .defaultValue(PROP_DEFAULT_CLIENT)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_KEEPALIVE = new
PropertyDescriptor
+ .Builder().name(PROP_NAME_KEEPALIVE)
+ .description("Seconds a WebSocket-connection remains open
after automatically renewing it. " +
+ "This is neccessary due to Amazon's service limit on
WebSocket connection duration. " +
+ "As soon as the limit is changed by Amazon you can
adjust the value here. Never use " +
+ "a duration longer than supported by Amazon. This
processor renews the connection " +
+ "" +
DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION + " seconds before the
" +
+ "actual expiration. If no value set the default will
be " + PROP_DEFAULT_KEEPALIVE + ".")
+ .required(false)
+ .defaultValue(PROP_DEFAULT_KEEPALIVE.toString())
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_TOPIC = new
PropertyDescriptor
+ .Builder().name(PROP_NAME_TOPIC)
+ .description("MQTT topic to work with. (pattern:
$aws/things/mything/shadow/update).")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor PROP_QOS = new
PropertyDescriptor
+ .Builder().name(PROP_NAME_QOS)
+ .description("Decide for at most once (0) or at least once (1)
message-receiption. " +
+ "Currently AWS IoT does not support QoS-level 2. If no
value set the default QoS " +
+ "is " + DEFAULT_QOS + ".")
+ .required(false)
+ .allowableValues("0", "1")
+ .defaultValue(DEFAULT_QOS.toString())
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ /**
+ * Create client using credentials provider. This is the preferred way
for creating clients
+ */
+ @Override
+ protected AWSIotClient createClient(final ProcessContext context,
final AWSCredentialsProvider credentialsProvider, final ClientConfiguration
config) {
+ getLogger().info("Creating client using aws credentials provider
");
+ // Actually this client is not needed. However, it is initialized
due to the pattern of
+ // AbstractAWSCredentialsProviderProcessor
+ return new AWSIotClient(credentialsProvider, config);
+ }
+
+ /**
+ * Create client using AWSCredentails
+ *
+ * @deprecated use {@link #createClient(ProcessContext,
AWSCredentialsProvider, ClientConfiguration)} instead
+ */
+ @Override
+ protected AWSIotClient createClient(final ProcessContext context,
final AWSCredentials credentials, final ClientConfiguration config) {
+ getLogger().info("Creating client using aws credentials ");
+ // Actually this client is not needed. it is initialized due to
the pattern of
+ // AbstractAWSProcessor
+ return new AWSIotClient(credentials, config);
+ }
+
+ /**
+ * Gets ready an MQTT client by connecting to a AWS IoT WebSocket
endpoint specific to the properties
+ * @param context processor context
+ */
+ void init(final ProcessContext context) {
+ // read out properties
+ awsEndpoint = context.getProperty(PROP_ENDPOINT).getValue();
+ awsRegion = context.getProperty(REGION).getValue();
+ awsClientId = context.getProperty(PROP_CLIENT).isSet() ?
context.getProperty(PROP_CLIENT).getValue() : PROP_DEFAULT_CLIENT;
+ awsKeepAliveSeconds = context.getProperty(PROP_KEEPALIVE).isSet()
? context.getProperty(PROP_KEEPALIVE).asInteger() : PROP_DEFAULT_KEEPALIVE;
+ awsTopic = context.getProperty(PROP_TOPIC).getValue();
+ awsQos = context.getProperty(PROP_QOS).isSet() ?
context.getProperty(PROP_QOS).asInteger() : DEFAULT_QOS;
+ // initialize and connect to mqtt endpoint
+ mqttClient = connect(context);
+ }
+
+ @OnStopped
+ public void onStopped(final ProcessContext context) {
+ try {
+ mqttClient.disconnect();
+ } catch (MqttException me) {
+ getLogger().warn("MQTT " + me.getMessage());
+ }
+ getLogger().info("Disconnected");
+ }
+
+ /**
+ * Returns the lifetime-seconds of the established websocket-connection
+ * @return seconds
+ */
+ long getConnectionDuration() {
+ return dtLastConnect != null
+ ? TimeUnit.MILLISECONDS.toSeconds(new Date().getTime() -
dtLastConnect.getTime())
+ : awsKeepAliveSeconds + 1;
+ }
+
+ /**
+ * In seconds get the remaining lifetime of the connection. It is not
the actual time to
+ * expiration but an advice to when it is worth renewing the
connection.
+ * @return seconds
+ */
+ long getRemainingConnectionLifetime() {
+ return awsKeepAliveSeconds -
DEFAULT_CONNECTION_RENEWAL_BEFORE_KEEP_ALIVE_EXPIRATION;
+ }
+
+ /**
+ * Indicates if WebSocket connection is about to expire. It gives the
caller an advice
+ * to renew the connection some time before the actual expiration.
+ * @return Indication (if true caller should renew the connection)
+ */
+ boolean isConnectionAboutToExpire() {
+ return getConnectionDuration() > getRemainingConnectionLifetime();
+ }
+
+ /**
+ * Connects to the websocket-endpoint over an MQTT client.
+ * @param context processcontext
+ * @return websocket connection client
+ */
+ MqttWebSocketAsyncClient connect(ProcessContext context) {
+ getCredentialsProvider(context).refresh();
+ AWSCredentials awsCredentials =
getCredentialsProvider(context).getCredentials();
+ MqttWebSocketAsyncClient _mqttClient = null;
+
+ // generate mqtt endpoint-address with authentication details
+ String strEndpointAddress;
+ try {
+ strEndpointAddress = AWS4Signer.getAddress(awsRegion,
awsEndpoint, awsCredentials);
+ } catch (Exception e) {
+ getLogger().error("Error while generating AWS endpoint-address
caused by " + e.getMessage());
+ return null;
+ }
+ // extend clientId with random string in order to ensure unique id
per connection
+ String clientId = awsClientId + RandomStringUtils.random(12, true,
false);
--- End diff --
JPercivall on Jun 2, 2016 Contributor
I'm on the fence about appending a random string to the client ID to ensure
uniqueness. This could lead to laziness when creating the clientId and making
it so the user doesn't know where a connection came from (when looking at the
AWS UI). Wouldn't it be better to not append the random string and instead
force a user to create a better clientIds?
KayLerch on Jun 2, 2016 • edited
The processor subscribing to a topic needs two clientIds as for QoS 1 it
opens another after closing the first websocket connection. That said I have to
append the user's clientId with something unique or I have to set up a second
property and ask the user for a second clientId. Maybe the appendix should not
have 12 characters but only switches between "-1" and "-2". What do you think?
JPercivall on Jun 2, 2016 Contributor
Hmmm, that is tricky/annoying there isn't a better way to handle
reconnecting the same connection/clientId without losing messages.
If there will only ever be two different connections at a time then I would
vote for switching between appending "-1" and "-2". This will give better
protection against having multiple connections with the same base clientId.
Also be sure to mention this in the description of the "clientId" property.
> AWS IoT processors
> ------------------
>
> Key: NIFI-1767
> URL: https://issues.apache.org/jira/browse/NIFI-1767
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Kay Lerch
> Attachments: 20160413_apache-nifi-aws-iot-pull-request_lerchkay.pdf
>
>
> Four new processors to communicate with Amazon’s managed device gateway
> service AWS IoT.
> h5.Use cases
> * Consume reported states from a fleet of things managed and secured on
> Amazon’s gateway service
> * Propagate desired states to a fleet of things managed and secured on
> Amazon’s gateway service
> * Intercept M2M communication
> * Hybrid IoT solutions: brings together a managed device gateway in the cloud
> and onpremise data-consumers and -providers.
> h4.GetIOTMqtt:
> Opens up a connection to an AWS-account-specific websocket endpoint in order
> to subscribe to any of the MQTT topics belonging to a registered thing in AWS
> IoT.
> h4.PutIOTMqtt
> Opens up a connection to an AWS-account-specific websocket endpoint in order
> to publish messages to any of the MQTT topics belonging to a registered thing
> in AWS IoT.
> h4.GetIOTShadow
> In AWS IoT a physical thing is represented with its last reported state by
> the so-called thing shadow. This processor reads out the current state of a
> shadow (persisted as JSON) by requesting the managed API of AWS IoT.
> h4.PutIOTShadow
> In AWS IoT a physical thing is represented with its last reported state by
> the so-called thing shadow. This processor updates the current state of a
> shadow (persisted as JSON) by requesting the managed API of AWS IoT. An
> update to a shadow lets AWS IoT propagate changes to the MQTT topics of the
> thing.
> h5.Known issues:
> * It was hard for me to write appropriate integration tests since the MQTT
> processors work with durable websocket-connections which are kind of tough to
> test. With your help I would love to do a better job on testing and hand it
> in later on. All of the processors were tested in a live-scenario which ran
> over a longer period of time. Didn’t observe any issue.
> * I got rid of all the properties for the deprecated
> AWSCredentialProviderService and only made use of
> AWSCredentialsProviderControllerService. If both are still necessary for
> backward-compatibilities sake I would add the deprecated feature.
> Refers to Pull Request 349: https://github.com/apache/nifi/pull/349
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)