[
https://issues.apache.org/jira/browse/NIFI-987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14990062#comment-14990062
]
ASF GitHub Bot commented on NIFI-987:
-------------------------------------
Github user apiri commented on a diff in the pull request:
https://github.com/apache/nifi/pull/91#discussion_r43916468
--- Diff:
nifi-nar-bundles/nifi-riemann-bundle/nifi-riemann-processors/src/main/java/org/apache/nifi/processors/riemann/PutRiemann.java
---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.riemann;
+
+import com.aphyr.riemann.Proto;
+import com.aphyr.riemann.Proto.Event;
+import com.aphyr.riemann.client.RiemannClient;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+@Tags({"riemann", "monitoring", "metrics"})
+@DynamicProperty(name = "Custom Event Attribute",
supportsExpressionLanguage = true,
+ description = "These values will be attached to the Riemann event as a
custom attribute",
+ value = "Any value or expression")
+@CapabilityDescription("Send events to Riemann")
+@SupportsBatching
+public class PutRiemann extends AbstractProcessor {
+ protected enum Transport {
+ TCP, UDP
+ }
+
+ protected RiemannClient riemannClient = null;
+ protected Transport transport;
+
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Metrics successfully written to Riemann")
+ .build();
+
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Metrics which failed to write to Riemann")
+ .build();
+
+
+ public static final PropertyDescriptor RIEMANN_HOST = new
PropertyDescriptor.Builder()
+ .name("Riemann Address")
+ .description("Hostname of Riemann server")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor RIEMANN_PORT = new
PropertyDescriptor.Builder()
+ .name("Riemann Port")
+ .description("Port that Riemann is listening on")
+ .required(true)
+ .defaultValue("5555")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor TRANSPORT_PROTOCOL = new
PropertyDescriptor.Builder()
+ .name("Transport Protocol")
+ .description("Transport protocol to speak to Riemann in")
+ .required(true)
+ .allowableValues(new Transport[]{Transport.TCP, Transport.UDP})
+ .defaultValue("TCP")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor BATCH_SIZE = new
PropertyDescriptor.Builder()
+ .name("Batch Size")
+ .description("Batch size for incoming FlowFiles")
+ .required(false)
+ .defaultValue("100")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(StandardValidators.INTEGER_VALIDATOR)
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ // Attributes Mappings
+ public static final PropertyDescriptor ATTR_SERVICE = new
PropertyDescriptor.Builder()
+ .name("Service")
+ .description("Name of service for the event")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor ATTR_STATE = new
PropertyDescriptor.Builder()
+ .name("State")
+ .description("State of service for the event (e.g. ok, warning, or
critical)")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor ATTR_TIME = new
PropertyDescriptor.Builder()
+ .name("Time")
+ .description("Time for event")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor ATTR_HOST = new
PropertyDescriptor.Builder()
+ .name("Host")
+ .description("Name of host for the event")
+ .required(false)
+ .defaultValue("${hostname()}")
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor ATTR_TTL = new
PropertyDescriptor.Builder()
+ .name("TTL")
+ .description("Time to live for the event")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor ATTR_METRIC = new
PropertyDescriptor.Builder()
+ .name("Metric")
+ .description("Metric for the event")
+ .required(false)
+ .addValidator(Validator.VALID)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final PropertyDescriptor ATTR_DESCRIPTION = new
PropertyDescriptor.Builder()
+ .name("Description")
+ .description("Description for the event")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+
+ public static final PropertyDescriptor ATTR_TAGS = new
PropertyDescriptor.Builder()
+ .name("Tags")
+ .description("Comma separated list of tags for the event")
+ .required(false)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .build();
+
+ public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Timeout")
+ .description("Timeout in milliseconds when writing events to Riemann")
+ .required(true)
+ .defaultValue("1000")
+ .addValidator(StandardValidators.POSITIVE_LONG_VALIDATOR)
+ .build();
+
+ private List<PropertyDescriptor> customAttributes = new ArrayList<>();
+ private static final Set<Relationship> relationships = new HashSet<>();
+ private static final List<PropertyDescriptor> localProperties = new
ArrayList<>();
+
+ private int batchSize = -1;
+ private long writeTimeout = 1000;
+
+ static {
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ localProperties.add(RIEMANN_HOST);
+ localProperties.add(RIEMANN_PORT);
+ localProperties.add(TRANSPORT_PROTOCOL);
+ localProperties.add(TIMEOUT);
+ localProperties.add(BATCH_SIZE);
+ localProperties.add(ATTR_DESCRIPTION);
+ localProperties.add(ATTR_SERVICE);
+ localProperties.add(ATTR_STATE);
+ localProperties.add(ATTR_METRIC);
+ localProperties.add(ATTR_TTL);
+ localProperties.add(ATTR_TAGS);
+ localProperties.add(ATTR_HOST);
+ localProperties.add(ATTR_TIME);
+ }
+
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return localProperties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .expressionLanguageSupported(true)
+ .addValidator(Validator.VALID)
+ .required(false)
+ .dynamic(true)
+ .build();
+ }
+
+ @OnStopped
+ public final void cleanUpClient() {
+ if (riemannClient != null) {
+ this.riemannClient.close();
+ }
+ this.riemannClient = null;
+ this.batchSize = -1;
+ this.customAttributes.clear();
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws ProcessException {
+ if (batchSize == -1) {
+ batchSize = context.getProperty(BATCH_SIZE).asInteger();
+ }
+ if (riemannClient == null || !riemannClient.isConnected()) {
+ transport =
Transport.valueOf(context.getProperty(TRANSPORT_PROTOCOL).getValue());
+ String host = context.getProperty(RIEMANN_HOST).getValue().trim();
+ int port = context.getProperty(RIEMANN_PORT).asInteger();
+ writeTimeout = context.getProperty(TIMEOUT).asLong();
+ RiemannClient client = null;
+ try {
+ switch (transport) {
+ case TCP:
+ client = RiemannClient.tcp(host, port);
+ break;
+ case UDP:
+ client = RiemannClient.udp(host, port);
+ break;
+ }
+ client.connect();
--- End diff --
I believe this is the culprit in what works out to be a resource leak.
When testing against my sample instance, and the server went down (running in a
docker container), I quickly ended up in an OOME.
```
2015-11-04 11:31:19,111 ERROR [Timer-Driven Process Thread-1]
o.a.nifi.processors.riemann.PutRiemann
PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] Failed to process session
due to org.apache.nifi.processor.exception.ProcessException: Unable to connect
to Riemann [192.168.99.100:5555] (TCP)
Connection failed: org.apache.nifi.processor.exception.ProcessException:
Unable to connect to Riemann [192.168.99.100:5555] (TCP)
Connection failed
2015-11-04 11:31:20,114 ERROR [Timer-Driven Process Thread-5]
o.a.nifi.processors.riemann.PutRiemann
PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d]
PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] failed to process due to
java.lang.OutOfMemoryError: unable to create new native thread; rolling back
session: java.lang.OutOfMemoryError: unable to create new native thread
2015-11-04 11:31:20,114 ERROR [Timer-Driven Process Thread-5]
o.a.nifi.processors.riemann.PutRiemann
PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d]
PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] failed to process session
due to java.lang.OutOfMemoryError: unable to create new native thread:
java.lang.OutOfMemoryError: unable to create new native thread
2015-11-04 11:31:20,114 WARN [Timer-Driven Process Thread-5]
o.a.nifi.processors.riemann.PutRiemann
PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] Processor Administratively
Yielded for 1 sec due to processing failure
2015-11-04 11:31:20,115 WARN [Timer-Driven Process Thread-5]
o.a.n.c.t.ContinuallyRunProcessorTask Administratively Yielding
PutRiemann[id=db08f8c1-a7a1-41fc-96c4-36e350bbb44d] due to uncaught Exception:
java.lang.OutOfMemoryError: unable to create new native thread
2015-11-04 11:31:20,119 WARN [Timer-Driven Process Thread-5]
o.a.n.c.t.ContinuallyRunProcessorTask
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method) [na:1.8.0_60]
at java.lang.Thread.start(Thread.java:714) [na:1.8.0_60]
at
java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
[na:1.8.0_60]
at
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1368)
[na:1.8.0_60]
at
org.jboss.netty.util.internal.DeadLockProofWorker.start(DeadLockProofWorker.java:38)
~[na:na]
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.openSelector(AbstractNioSelector.java:344)
~[na:na]
at
org.jboss.netty.channel.socket.nio.AbstractNioSelector.<init>(AbstractNioSelector.java:96)
~[na:na]
at
org.jboss.netty.channel.socket.nio.AbstractNioWorker.<init>(AbstractNioWorker.java:51)
~[na:na]
at
org.jboss.netty.channel.socket.nio.NioWorker.<init>(NioWorker.java:45) ~[na:na]
at
org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:45)
~[na:na]
at
org.jboss.netty.channel.socket.nio.NioWorkerPool.createWorker(NioWorkerPool.java:28)
~[na:na]
at
org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.newWorker(AbstractNioWorkerPool.java:99)
~[na:na]
at
org.jboss.netty.channel.socket.nio.AbstractNioWorkerPool.init(AbstractNioWorkerPool.java:69)
~[na:na]
at
org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:39)
~[na:na]
at
org.jboss.netty.channel.socket.nio.NioWorkerPool.<init>(NioWorkerPool.java:33)
~[na:na]
at
org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:152)
~[na:na]
at
org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory.<init>(NioClientSocketChannelFactory.java:117)
~[na:na]
at com.aphyr.riemann.client.TcpTransport.connect(TcpTransport.java:151)
~[na:na]
at
com.aphyr.riemann.client.RiemannClient.connect(RiemannClient.java:155) ~[na:na]
at
org.apache.nifi.processors.riemann.PutRiemann.onScheduled(PutRiemann.java:260)
~[na:na]
at
org.apache.nifi.processors.riemann.PutRiemann.onTrigger(PutRiemann.java:286)
~[na:na]
at
org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
~[nifi-api-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
at
org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1134)
~[nifi-framework-core-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:127)
[nifi-framework-core-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
at
org.apache.nifi.controller.tasks.ContinuallyRunProcessorTask.call(ContinuallyRunProcessorTask.java:49)
[nifi-framework-core-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
at
org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:119)
[nifi-framework-core-0.3.1-SNAPSHOT.jar:0.3.1-SNAPSHOT]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
[na:1.8.0_60]
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
[na:1.8.0_60]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
[na:1.8.0_60]
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
[na:1.8.0_60]
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[na:1.8.0_60]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[na:1.8.0_60]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
```
Performing a cursory look through the code, it seems like when that
IOException occurs the associated client should also be closed
> Add Processor for Writing Events to Riemann
> -------------------------------------------
>
> Key: NIFI-987
> URL: https://issues.apache.org/jira/browse/NIFI-987
> Project: Apache NiFi
> Issue Type: New Feature
> Reporter: Ricky Saltzer
> Assignee: Ricky Saltzer
> Fix For: 0.4.0
>
> Attachments: Sample Riemann Dataflow .png
>
>
> Riemann (http://riemann.io) is a new framework for monitoring distributed
> systems. It's particular useful for sending ad-hoc events such as, heartbeats
> and metrics. It would be nice if NiFi had a PutRiemann processor for writing
> events using the NiFi expression language.
> A simple use case would be a data flow that repeatedly checks specific
> services over TCP, HTTP, etc and checks into Riemann. Another example would
> be detecting a blip in events coming down a stream using a simple event
> heartbeat mechanism. I'll post a couple visuals to help make these examples
> more concrete.
> I have an initial PutRiemann processor made. I will post the patch via a
> Github pull request later today.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)