Github user bbende commented on a diff in the pull request:
https://github.com/apache/nifi/pull/233#discussion_r54134089
--- Diff:
nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/put/AbstractPutEventProcessor.java
---
@@ -0,0 +1,474 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.put;
+
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processor.util.put.sender.ChannelSender;
+import org.apache.nifi.processor.util.put.sender.DatagramChannelSender;
+import org.apache.nifi.processor.util.put.sender.SSLSocketChannelSender;
+import org.apache.nifi.processor.util.put.sender.SocketChannelSender;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A base class for processors that send data to an external system using
TCP or UDP.
+ */
+public abstract class AbstractPutEventProcessor extends
AbstractSessionFactoryProcessor {
+
+ public static final PropertyDescriptor HOSTNAME = new
PropertyDescriptor.Builder()
+ .name("Hostname")
+ .description("The ip address or hostname of the destination.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .defaultValue("localhost")
+ .required(true)
+ .build();
+ public static final PropertyDescriptor PORT = new PropertyDescriptor
+ .Builder().name("Port")
+ .description("The port on the destination.")
+ .required(true)
+ .addValidator(StandardValidators.PORT_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor MAX_SOCKET_SEND_BUFFER_SIZE =
new PropertyDescriptor.Builder()
+ .name("Max Size of Socket Send Buffer")
+ .description("The maximum size of the socket send buffer that
should be used. This is a suggestion to the Operating System " +
+ "to indicate how big the socket buffer should be. If
this value is set too low, the buffer may fill up before " +
+ "the data can be read, and incoming data will be
dropped.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .required(true)
+ .build();
+ public static final PropertyDescriptor CHARSET = new
PropertyDescriptor.Builder()
+ .name("Character Set")
+ .description("Specifies the character set of the data being
sent.")
+ .required(true)
+ .defaultValue("UTF-8")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor TIMEOUT = new
PropertyDescriptor.Builder()
+ .name("Timeout")
+ .description("The timeout for connecting to and communicating
with the destination. Does not apply to UDP")
+ .required(false)
+ .defaultValue("10 seconds")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+ public static final PropertyDescriptor IDLE_EXPIRATION = new
PropertyDescriptor
+ .Builder().name("Idle Connection Expiration")
+ .description("The amount of time a connection should be held
open without being used before closing the connection.")
+ .required(true)
+ .defaultValue("5 seconds")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .build();
+
+ // Putting these properties here so sub-classes don't have to redefine
them, but they are
+ // not added to the properties by default since not all processors may
need them
+
+ public static final AllowableValue TCP_VALUE = new
AllowableValue("TCP", "TCP");
+ public static final AllowableValue UDP_VALUE = new
AllowableValue("UDP", "UDP");
+
+ public static final PropertyDescriptor PROTOCOL = new
PropertyDescriptor
+ .Builder().name("Protocol")
+ .description("The protocol for communication.")
+ .required(true)
+ .allowableValues(TCP_VALUE, UDP_VALUE)
+ .defaultValue(UDP_VALUE.getValue())
+ .build();
+ public static final PropertyDescriptor MESSAGE_DELIMITER = new
PropertyDescriptor.Builder()
+ .name("Message Delimiter")
+ .description("Specifies the delimiter to use for splitting
apart multiple messages within a single FlowFile. "
+ + "If not specified, the entire content of the
FlowFile will be used as a single message. "
+ + "If specified, the contents of the FlowFile will be
split on this delimiter and each section "
+ + "sent as a separate message. Note that if messages
are delimited and some messages for a given FlowFile "
+ + "are transferred successfully while others are not,
the messages will be split into individual FlowFiles, such that those "
+ + "messages that were successfully sent are routed to
the 'success' relationship while other messages are sent to the 'failure' "
+ + "relationship.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("FlowFiles that are sent successfully to the
destination are sent out this relationship.")
+ .build();
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("FlowFiles that failed to send to the destination
are sent out this relationship.")
+ .build();
+
+ private Set<Relationship> relationships;
+ private List<PropertyDescriptor> descriptors;
+
+ protected volatile String transitUri;
+ protected volatile BlockingQueue<ChannelSender> senderPool;
+
+ protected final BlockingQueue<FlowFileMessageBatch> completeBatches =
new LinkedBlockingQueue<>();
+ protected final Set<FlowFileMessageBatch> activeBatches =
Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>());
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(HOSTNAME);
+ descriptors.add(PORT);
+ descriptors.add(MAX_SOCKET_SEND_BUFFER_SIZE);
+ descriptors.add(CHARSET);
+ descriptors.add(TIMEOUT);
+ descriptors.add(IDLE_EXPIRATION);
+ descriptors.addAll(getAdditionalProperties());
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ relationships.addAll(getAdditionalRelationships());
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ /**
+ * Override to provide additional relationships for the processor.
+ *
+ * @return a list of relationships
+ */
+ protected List<Relationship> getAdditionalRelationships() {
+ return Collections.EMPTY_LIST;
+ }
+
+ /**
+ * Override to provide additional properties for the processor.
+ *
+ * @return a list of properties
+ */
+ protected List<PropertyDescriptor> getAdditionalProperties() {
+ return Collections.EMPTY_LIST;
+ }
--- End diff --
The idea was to make getSupportedPropertyDescriptors() final so that the
properties provided by the base class had to be there, and then provide
getAdditionalProperties() to allow a subclass to add additional properties.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---