Repository: nifi
Updated Branches:
  refs/heads/master 33ba1a822 -> 710761642


NIFI-2872 Create PutCloudWatchMetric Processor

This closes #1125.

Signed-off-by: James Wing <jvw...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/71076164
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/71076164
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/71076164

Branch: refs/heads/master
Commit: 7107616420ed31c057ff7bf7f06bf36e5740b3ea
Parents: 33ba1a8
Author: Edgardo <edgardo.v...@nextcentury.com>
Authored: Tue Oct 11 14:07:42 2016 -0400
Committer: James Wing <jvw...@gmail.com>
Committed: Thu Oct 13 19:53:53 2016 -0700

----------------------------------------------------------------------
 .../aws/cloudwatch/PutCloudWatchMetric.java     | 207 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   4 +-
 .../aws/cloudwatch/ITPutCloudWatchMetric.java   |  73 +++++++
 .../aws/cloudwatch/MockPutCloudWatchMetric.java |  49 +++++
 .../aws/cloudwatch/TestPutCloudWatchMetric.java | 151 ++++++++++++++
 5 files changed, 483 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/71076164/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
new file mode 100644
index 0000000..56f2d99
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/cloudwatch/PutCloudWatchMetric.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.cloudwatch;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
+import com.amazonaws.services.cloudwatch.model.MetricDatum;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
+
+
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"amazon", "aws", "cloudwatch", "metrics", "put", "publish"})
+@CapabilityDescription("Publishes metrics to Amazon CloudWatch")
+public class PutCloudWatchMetric extends 
AbstractAWSCredentialsProviderProcessor<AmazonCloudWatchClient> {
+
+    public static final Set<Relationship> relationships = 
Collections.unmodifiableSet(
+            new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
+
+    private static final Validator DOUBLE_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(String subject, String input, 
ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject) && 
context.isExpressionLanguagePresent(input)) {
+                return (new 
ValidationResult.Builder()).subject(subject).input(input).explanation("Expression
 Language Present").valid(true).build();
+            } else {
+                String reason = null;
+
+                try {
+                    Double.parseDouble(input);
+                } catch (NumberFormatException e) {
+                    reason = "not a valid Double";
+                }
+
+                return (new 
ValidationResult.Builder()).subject(subject).input(input).explanation(reason).valid(reason
 == null).build();
+            }
+        }
+    };
+
+    public static final PropertyDescriptor NAMESPACE = new 
PropertyDescriptor.Builder()
+            .name("Namespace")
+            .displayName("Namespace")
+            .description("The namespace for the metric data for CloudWatch")
+            .required(true)
+            .expressionLanguageSupported(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor METRIC_NAME = new 
PropertyDescriptor.Builder()
+            .name("MetricName")
+            .displayName("MetricName")
+            .description("The name of the metric")
+            .expressionLanguageSupported(true)
+            .required(true)
+            .addValidator(new StandardValidators.StringLengthValidator(1, 255))
+            .build();
+
+    public static final PropertyDescriptor VALUE = new 
PropertyDescriptor.Builder()
+            .name("Value")
+            .displayName("Value")
+            .description("The value for the metric. Must be a double")
+            .expressionLanguageSupported(true)
+            .required(true)
+            .addValidator(DOUBLE_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor TIMESTAMP = new 
PropertyDescriptor.Builder()
+            .name("Timestamp")
+            .displayName("Timestamp")
+            .description("A point in time expressed as the number of 
milliseconds since Jan 1, 1970 00:00:00 UTC. If not specified, the default 
value is set to the time the metric data was received")
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.LONG_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor UNIT = new 
PropertyDescriptor.Builder()
+            .name("Unit")
+            .displayName("Unit")
+            .description("The unit of the metric. (e.g Seconds, Bytes, 
Megabytes, Percent, Count,  Kilobytes/Second, Terabits/Second, Count/Second) 
For details see 
http://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html";)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final List<PropertyDescriptor> properties =
+            Collections.unmodifiableList(
+                    Arrays.asList(NAMESPACE, METRIC_NAME, VALUE, TIMESTAMP, 
UNIT, REGION, ACCESS_KEY, SECRET_KEY,
+                            CREDENTIALS_FILE, 
AWS_CREDENTIALS_PROVIDER_SERVICE, TIMEOUT, SSL_CONTEXT_SERVICE,
+                            ENDPOINT_OVERRIDE, PROXY_HOST, PROXY_HOST_PORT)
+            );
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    /**
+     * Create client using aws credentials provider. This is the preferred way 
for creating clients
+     */
+
+    @Override
+    protected AmazonCloudWatchClient createClient(ProcessContext 
processContext, AWSCredentialsProvider awsCredentialsProvider, 
ClientConfiguration clientConfiguration) {
+        getLogger().info("Creating client using aws credentials provider");
+        return new AmazonCloudWatchClient(awsCredentialsProvider, 
clientConfiguration);
+    }
+
+    /**
+     * Create client using AWSCredentials
+     *
+     * @deprecated use {@link #createClient(ProcessContext, 
AWSCredentialsProvider, ClientConfiguration)} instead
+     */
+    @Override
+    protected AmazonCloudWatchClient createClient(ProcessContext 
processContext, AWSCredentials awsCredentials, ClientConfiguration 
clientConfiguration) {
+        getLogger().debug("Creating client with aws credentials");
+        return new AmazonCloudWatchClient(awsCredentials, clientConfiguration);
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+        MetricDatum datum = new MetricDatum();
+
+        try {
+            
datum.setMetricName(context.getProperty(METRIC_NAME).evaluateAttributeExpressions(flowFile).getValue());
+            
datum.setValue(Double.parseDouble(context.getProperty(VALUE).evaluateAttributeExpressions(flowFile).getValue()));
+
+            final String timestamp = 
context.getProperty(TIMESTAMP).evaluateAttributeExpressions(flowFile).getValue();
+            if (timestamp != null) {
+                datum.setTimestamp(new Date(Long.parseLong(timestamp)));
+            }
+
+            final String unit = 
context.getProperty(UNIT).evaluateAttributeExpressions(flowFile).getValue();
+            if (unit != null) {
+                datum.setUnit(unit);
+            }
+
+            final PutMetricDataRequest metricDataRequest = new 
PutMetricDataRequest()
+                    
.withNamespace(context.getProperty(NAMESPACE).evaluateAttributeExpressions(flowFile).getValue())
+                    .withMetricData(datum);
+
+            putMetricData(metricDataRequest);
+            session.transfer(flowFile, REL_SUCCESS);
+            getLogger().info("Successfully published cloudwatch metric for 
{}", new Object[]{flowFile});
+        } catch (final Exception e) {
+            getLogger().error("Failed to publish cloudwatch metric for {} due 
to {}", new Object[]{flowFile, e});
+            flowFile = session.penalize(flowFile);
+            session.transfer(flowFile, REL_FAILURE);
+        }
+
+    }
+
+    protected PutMetricDataResult putMetricData(PutMetricDataRequest 
metricDataRequest) throws AmazonClientException {
+        final AmazonCloudWatchClient client = getClient();
+        final PutMetricDataResult result = 
client.putMetricData(metricDataRequest);
+        return result;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/71076164/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index df265c3..27e39f0 100644
--- 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -20,9 +20,11 @@ org.apache.nifi.processors.aws.sns.PutSNS
 org.apache.nifi.processors.aws.sqs.GetSQS
 org.apache.nifi.processors.aws.sqs.PutSQS
 org.apache.nifi.processors.aws.sqs.DeleteSQS
-org.apache.nifi.processors.aws.lambda.PutLambda
+org.apache.nifi.processors.aws.lambda.PutLambda
 org.apache.nifi.processors.aws.kinesis.firehose.PutKinesisFirehose
 org.apache.nifi.processors.aws.dynamodb.GetDynamoDB
 org.apache.nifi.processors.aws.dynamodb.PutDynamoDB
 org.apache.nifi.processors.aws.dynamodb.DeleteDynamoDB
 org.apache.nifi.processors.aws.kinesis.stream.PutKinesisStream
+org.apache.nifi.processors.aws.cloudwatch.PutCloudWatchMetric
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/71076164/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/ITPutCloudWatchMetric.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/ITPutCloudWatchMetric.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/ITPutCloudWatchMetric.java
new file mode 100644
index 0000000..48eeecf
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/ITPutCloudWatchMetric.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.cloudwatch;
+
+import org.apache.nifi.processors.aws.AbstractAWSCredentialsProviderProcessor;
+import 
org.apache.nifi.processors.aws.credentials.provider.service.AWSCredentialsProviderControllerService;
+import org.apache.nifi.processors.aws.sns.PutSNS;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Provides integration level testing with actual AWS Cloudwatcch resources 
for {@link PutCloudWatchMetric} and requires additional configuration and 
resources to work.
+ */
+public class ITPutCloudWatchMetric {
+
+    private final String CREDENTIALS_FILE = System.getProperty("user.home") + 
"/aws-credentials.properties";
+
+    @Test
+    public void testPublish() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new 
PutCloudWatchMetric());
+
+        runner.setProperty(PutCloudWatchMetric.NAMESPACE, "Test");
+        runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "Test");
+        runner.setProperty(PutCloudWatchMetric.VALUE, "1.0");
+        runner.setProperty(PutCloudWatchMetric.CREDENTIALS_FILE, 
CREDENTIALS_FILE);
+
+        runner.enqueue(new byte[] {});
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 
1);
+    }
+
+    @Test
+    public void testPublishWithCredentialsProviderService() throws Throwable {
+        final TestRunner runner = TestRunners.newTestRunner(new 
PutCloudWatchMetric());
+        runner.setValidateExpressionUsage(false);
+
+        final AWSCredentialsProviderControllerService serviceImpl = new 
AWSCredentialsProviderControllerService();
+        runner.addControllerService("awsCredentialsProvider", serviceImpl);
+
+        runner.setProperty(serviceImpl, 
AbstractAWSCredentialsProviderProcessor.CREDENTIALS_FILE, 
System.getProperty("user.home") + "/aws-credentials.properties");
+        runner.enableControllerService(serviceImpl);
+
+        runner.assertValid(serviceImpl);
+
+        runner.setProperty(PutCloudWatchMetric.NAMESPACE, "Test");
+        runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "Test");
+        runner.setProperty(PutCloudWatchMetric.VALUE, "1.0");
+        runner.setProperty(PutSNS.AWS_CREDENTIALS_PROVIDER_SERVICE, 
"awsCredentialsProvider");
+
+        runner.enqueue(new byte[] {});
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 
1);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/71076164/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java
new file mode 100644
index 0000000..839c4d6
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/MockPutCloudWatchMetric.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.cloudwatch;
+
+import java.util.List;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.services.cloudwatch.model.MetricDatum;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataRequest;
+import com.amazonaws.services.cloudwatch.model.PutMetricDataResult;
+
+
+/**
+ * Simple mock {@link PutCloudWatchMetric} processor for testing.
+ */
+public class MockPutCloudWatchMetric extends PutCloudWatchMetric {
+
+    protected String actualNamespace;
+    protected List<MetricDatum> actualMetricData;
+    protected AmazonClientException throwException;
+    protected PutMetricDataResult result = new PutMetricDataResult();
+    protected int putMetricDataCallCount = 0;
+
+    protected PutMetricDataResult putMetricData(PutMetricDataRequest 
metricDataRequest) throws AmazonClientException {
+        putMetricDataCallCount++;
+        actualNamespace = metricDataRequest.getNamespace();
+        actualMetricData = metricDataRequest.getMetricData();
+
+        if (throwException != null) {
+            throw throwException;
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/71076164/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
new file mode 100644
index 0000000..dbc7f8c
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-aws-bundle/nifi-aws-processors/src/test/java/org/apache/nifi/processors/aws/cloudwatch/TestPutCloudWatchMetric.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.aws.cloudwatch;
+
+import java.util.Map;
+import java.util.HashMap;
+
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+
+import com.amazonaws.services.cloudwatch.model.InvalidParameterValueException;
+import com.amazonaws.services.cloudwatch.model.MetricDatum;
+import org.junit.Test;
+import org.junit.Assert;
+
+/**
+ * Unit tests for {@link PutCloudWatchMetric}.
+ */
+public class TestPutCloudWatchMetric {
+
+    @Test
+    public void testPutSimpleMetric() throws Exception {
+        MockPutCloudWatchMetric mockPutCloudWatchMetric = new 
MockPutCloudWatchMetric();
+        final TestRunner runner = 
TestRunners.newTestRunner(mockPutCloudWatchMetric);
+
+        runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
+        runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
+        runner.setProperty(PutCloudWatchMetric.VALUE, "1.0");
+        runner.setProperty(PutCloudWatchMetric.UNIT, "Count");
+        runner.setProperty(PutCloudWatchMetric.TIMESTAMP, "1476296132575");
+        runner.assertValid();
+
+        runner.enqueue(new byte[] {});
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 
1);
+        Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
+        Assert.assertEquals("TestNamespace", 
mockPutCloudWatchMetric.actualNamespace);
+        MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
+        Assert.assertEquals("TestMetric", datum.getMetricName());
+        Assert.assertEquals(1d, datum.getValue(), 0.0001d);
+    }
+
+    @Test
+    public void testValueLiteralDoubleInvalid() throws Exception {
+        MockPutCloudWatchMetric mockPutCloudWatchMetric = new 
MockPutCloudWatchMetric();
+        final TestRunner runner = 
TestRunners.newTestRunner(mockPutCloudWatchMetric);
+
+        runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
+        runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
+        runner.setProperty(PutCloudWatchMetric.VALUE, "nan");
+        runner.assertNotValid();
+    }
+
+    @Test
+    public void testMetricExpressionValid() throws Exception {
+        MockPutCloudWatchMetric mockPutCloudWatchMetric = new 
MockPutCloudWatchMetric();
+        final TestRunner runner = 
TestRunners.newTestRunner(mockPutCloudWatchMetric);
+
+        runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
+        runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
+        runner.setProperty(PutCloudWatchMetric.VALUE, "${metric.value}");
+        runner.assertValid();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("metric.value", "1.23");
+        runner.enqueue(new byte[] {}, attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_SUCCESS, 
1);
+        Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
+        Assert.assertEquals("TestNamespace", 
mockPutCloudWatchMetric.actualNamespace);
+        MetricDatum datum = mockPutCloudWatchMetric.actualMetricData.get(0);
+        Assert.assertEquals("TestMetric", datum.getMetricName());
+        Assert.assertEquals(1.23d, datum.getValue(), 0.0001d);
+    }
+
+    @Test
+    public void testMetricExpressionInvalidRoutesToFailure() throws Exception {
+        MockPutCloudWatchMetric mockPutCloudWatchMetric = new 
MockPutCloudWatchMetric();
+        final TestRunner runner = 
TestRunners.newTestRunner(mockPutCloudWatchMetric);
+
+        runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
+        runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
+        runner.setProperty(PutCloudWatchMetric.VALUE, "${metric.value}");
+        runner.assertValid();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("metric.value", "nan");
+        runner.enqueue(new byte[] {}, attributes);
+        runner.run();
+
+        Assert.assertEquals(0, mockPutCloudWatchMetric.putMetricDataCallCount);
+        runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_FAILURE, 
1);
+    }
+
+    @Test
+    public void testInvalidUnitRoutesToFailure() throws Exception {
+        MockPutCloudWatchMetric mockPutCloudWatchMetric = new 
MockPutCloudWatchMetric();
+        mockPutCloudWatchMetric.throwException = new 
InvalidParameterValueException("Unit error message");
+        final TestRunner runner = 
TestRunners.newTestRunner(mockPutCloudWatchMetric);
+
+        runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
+        runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
+        runner.setProperty(PutCloudWatchMetric.UNIT, "BogusUnit");
+        runner.setProperty(PutCloudWatchMetric.VALUE, "1");
+        runner.assertValid();
+
+        runner.enqueue(new byte[] {});
+        runner.run();
+
+        Assert.assertEquals(1, mockPutCloudWatchMetric.putMetricDataCallCount);
+        runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_FAILURE, 
1);
+    }
+
+    @Test
+    public void testTimestampExpressionInvalidRoutesToFailure() throws 
Exception {
+        MockPutCloudWatchMetric mockPutCloudWatchMetric = new 
MockPutCloudWatchMetric();
+        final TestRunner runner = 
TestRunners.newTestRunner(mockPutCloudWatchMetric);
+
+        runner.setProperty(PutCloudWatchMetric.NAMESPACE, "TestNamespace");
+        runner.setProperty(PutCloudWatchMetric.METRIC_NAME, "TestMetric");
+        runner.setProperty(PutCloudWatchMetric.UNIT, "Count");
+        runner.setProperty(PutCloudWatchMetric.VALUE, "1");
+        runner.setProperty(PutCloudWatchMetric.TIMESTAMP, 
"${timestamp.value}");
+        runner.assertValid();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("timestamp.value", "1476296132575broken");
+        runner.enqueue(new byte[] {}, attributes);
+        runner.run();
+
+        Assert.assertEquals(0, mockPutCloudWatchMetric.putMetricDataCallCount);
+        runner.assertAllFlowFilesTransferred(PutCloudWatchMetric.REL_FAILURE, 
1);
+    }
+
+}
\ No newline at end of file

Reply via email to