[ 
https://issues.apache.org/jira/browse/NIFI-1273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111706#comment-15111706
 ] 

ASF GitHub Bot commented on NIFI-1273:
--------------------------------------

Github user trkurc commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/179#discussion_r50489401
  
    --- Diff: 
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
 ---
    @@ -0,0 +1,356 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processor.util.listen;
    +
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.DataUnit;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
    +import org.apache.nifi.processor.util.listen.event.Event;
    +
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.nio.charset.Charset;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.BlockingQueue;
    +import java.util.concurrent.LinkedBlockingQueue;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * An abstract processor to extend from when listening for events over a 
channel. This processor
    + * will start a ChannelDispatcher, and optionally a 
ChannelResponseDispatcher, in a background
    + * thread which will end up placing events on a queue to polled by the 
onTrigger method. Sub-classes
    + * are responsible for providing the dispatcher implementations.
    + *
    + * @param <E> the type of events being produced
    + */
    +public abstract class AbstractListenEventProcessor<E extends Event> 
extends AbstractProcessor {
    +
    +    public static final PropertyDescriptor PORT = new PropertyDescriptor
    +            .Builder().name("Port")
    +            .description("The port to listen on for communication.")
    +            .required(true)
    +            .addValidator(StandardValidators.PORT_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor CHARSET = new 
PropertyDescriptor.Builder()
    +            .name("Character Set")
    +            .description("Specifies the character set of the received 
data.")
    +            .required(true)
    +            .defaultValue("UTF-8")
    +            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
    +            .build();
    +    public static final PropertyDescriptor RECV_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Receive Buffer Size")
    +            .description("The size of each buffer used to receive 
messages. Adjust this value appropriately based on the expected size of the " +
    +                    "incoming messages.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("65507 B")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Max Size of Socket Buffer")
    +            .description("The maximum size of the socket buffer that 
should be used. This is a suggestion to the Operating System " +
    +                    "to indicate how big the socket buffer should be. If 
this value is set too low, the buffer may fill up before " +
    +                    "the data can be read, and incoming data will be 
dropped.")
    +            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
    +            .defaultValue("1 MB")
    +            .required(true)
    +            .build();
    +
    +    // Putting these properties here so sub-classes don't have to redefine 
them, but they are
    +    // not added to the properties by default since not all processors may 
need them
    +
    +    public static final PropertyDescriptor MAX_CONNECTIONS = new 
PropertyDescriptor.Builder()
    +            .name("Max Number of TCP Connections")
    +            .description("The maximum number of concurrent TCP connections 
to accept.")
    +            .addValidator(StandardValidators.createLongValidator(1, 65535, 
true))
    +            .defaultValue("2")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor MAX_BATCH_SIZE = new 
PropertyDescriptor.Builder()
    +            .name("Max Batch Size")
    +            .description(
    +                    "The maximum number of messages to add to a single 
FlowFile. If multiple messages are available, they will be concatenated along 
with "
    +                            + "the <Message Delimiter> up to this 
configured maximum number of messages")
    +            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
    +            .expressionLanguageSupported(false)
    +            .defaultValue("1")
    +            .required(true)
    +            .build();
    +    public static final PropertyDescriptor MESSAGE_DELIMITER = new 
PropertyDescriptor.Builder()
    +            .name("Message Delimiter")
    +            .description("Specifies the delimiter to place between 
messages when multiple messages are bundled together (see <Max Batch Size> 
property).")
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .defaultValue("\\n")
    +            .required(true)
    +            .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("Messages received successfully will be sent out 
this relationship.")
    +            .build();
    +
    +    private Set<Relationship> relationships;
    +    private List<PropertyDescriptor> descriptors;
    +
    +    protected volatile int port;
    +    protected volatile Charset charset;
    +    protected volatile ChannelDispatcher dispatcher;
    +    protected volatile BlockingQueue<E> events = new 
LinkedBlockingQueue<>(10);
    +    protected volatile BlockingQueue<E> errorEvents = new 
LinkedBlockingQueue<>();
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        final List<PropertyDescriptor> descriptors = new ArrayList<>();
    +        descriptors.add(PORT);
    +        descriptors.add(RECV_BUFFER_SIZE);
    +        descriptors.add(MAX_SOCKET_BUFFER_SIZE);
    +        descriptors.add(CHARSET);
    +        descriptors.addAll(getAdditionalProperties());
    +        this.descriptors = Collections.unmodifiableList(descriptors);
    +
    +        final Set<Relationship> relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.addAll(getAdditionalRelationships());
    +        this.relationships = Collections.unmodifiableSet(relationships);
    +    }
    +
    +    /**
    +     * Override to provide additional relationships for the processor.
    +     *
    +     * @return a list of relationships
    +     */
    +    protected List<Relationship> getAdditionalRelationships() {
    +        return Collections.EMPTY_LIST;
    +    }
    +
    +    /**
    +     * Override to provide additional properties for the processor.
    +     *
    +     * @return a list of properties
    +     */
    +    protected List<PropertyDescriptor> getAdditionalProperties() {
    +        return Collections.EMPTY_LIST;
    +    }
    +
    +    @Override
    +    public final Set<Relationship> getRelationships() {
    +        return this.relationships;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return descriptors;
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(final ProcessContext context) throws 
IOException {
    +        charset = Charset.forName(context.getProperty(CHARSET).getValue());
    +        port = context.getProperty(PORT).asInteger();
    +
    +        final int maxChannelBufferSize = 
context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
    +
    +        // create the dispatcher and call open() to bind to the given port
    +        dispatcher = createDispatcher(context, events);
    +        dispatcher.open(port, maxChannelBufferSize);
    +
    +        // start a thread to run the dispatcher
    +        final Thread readerThread = new Thread(dispatcher);
    +        readerThread.setName(getClass().getName() + " [" + getIdentifier() 
+ "]");
    +        readerThread.setDaemon(true);
    +        readerThread.start();
    +    }
    +
    +    /**
    +     * @param context the ProcessContext to retrieve property values from
    +     * @return a ChannelDispatcher to handle incoming connections
    +     *
    +     * @throws IOException if unable to listen on the requested port
    +     */
    +    protected abstract ChannelDispatcher createDispatcher(final 
ProcessContext context, final BlockingQueue<E> events) throws IOException;
    +
    +    // used for testing to access the random port that was selected
    +    public final int getDispatcherPort() {
    +        return dispatcher == null ? 0 : dispatcher.getPort();
    +    }
    +
    +    public int getErrorQueueSize() {
    +        return errorEvents.size();
    +    }
    +
    +    @OnUnscheduled
    +    public void onUnscheduled() {
    +        if (dispatcher != null) {
    +            dispatcher.stop();
    +            dispatcher.close();
    +        }
    +    }
    +
    +    /**
    +     * If pollErrorQueue is true, the error queue will be checked first 
and event will be
    +     * returned from the error queue if available.
    +     *
    +     * If pollErrorQueue is false, or no data is in the error queue, the 
regular queue is polled.
    +     *
    +     * If longPoll is true, the regular queue will be polled with a short 
timeout, otherwise it will
    +     * poll with no timeout which will return immediately.
    +     *
    +     * @param longPoll whether or not to poll the main queue with a small 
timeout
    +     * @param pollErrorQueue whether or not to poll the error queue first
    +     *
    +     * @return an event from one of the queues, or null if none are 
available
    +     */
    +    protected E getMessage(final boolean longPoll, final boolean 
pollErrorQueue) {
    +        E event = null;
    +        if (pollErrorQueue) {
    +            event = errorEvents.poll();
    +        }
    +
    +        if (event == null) {
    +            try {
    +                if (longPoll) {
    +                    event = events.poll(100, TimeUnit.MILLISECONDS);
    --- End diff --
    
    next chance we get, should replace this 100 with a constant.


> Add support for RELP in ListenSyslog
> ------------------------------------
>
>                 Key: NIFI-1273
>                 URL: https://issues.apache.org/jira/browse/NIFI-1273
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Tony Kurc
>            Assignee: Bryan Bende
>            Priority: Minor
>             Fix For: 0.5.0
>
>
> Add support for listening for syslog events using The Reliable Event Logging 
> Protocol (RELP) [1]
> http://www.rsyslog.com/doc/relp.html



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to