exceptionfactory commented on a change in pull request #5493:
URL: https://github.com/apache/nifi/pull/5493#discussion_r738754015
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Messages received successfully will be sent out this
relationship.")
+ .build();
+
+ protected static final String RECEIVED_COUNTER = "Messages Received";
+
+ protected List<PropertyDescriptor> descriptors;
+ protected Set<Relationship> relationships;
+ protected volatile int port;
+ protected volatile Charset charset;
+ protected volatile BlockingQueue<ByteArrayMessage> events;
+ protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+ protected volatile InetAddress hostname;
+ protected volatile EventServer eventServer;
+ protected volatile byte[] messageDemarcatorBytes;
+ protected volatile EventBatcher eventBatcher;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ListenerProperties.PORT);
+ descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+ descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.CHARSET);
+ descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+ descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+ descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+ descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+ descriptors.add(POOL_RECV_BUFFERS);
+ descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+ descriptors.add(CLIENT_AUTH);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws IOException {
+ int maxConnections =
context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+ int bufferSize =
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final String networkInterface =
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+ InetAddress hostname =
NetworkUtils.getInterfaceAddress(networkInterface);
+ Charset charset =
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+ port =
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+ events = new
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+ errorEvents = new LinkedBlockingQueue<>();
+ final String msgDemarcator = getMessageDemarcator(context);
+ messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+ final NettyEventServerFactory eventFactory = new
ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port,
TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+
+ final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ if (sslContextService != null) {
+ final String clientAuthValue =
context.getProperty(CLIENT_AUTH).getValue();
+ ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+ SSLContext sslContext = sslContextService.createContext();
+ if (sslContext != null) {
Review comment:
This null check should not be necessary, the service should throw an
exception if it cannot create an SSLContext.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Messages received successfully will be sent out this
relationship.")
+ .build();
+
+ protected static final String RECEIVED_COUNTER = "Messages Received";
+
+ protected List<PropertyDescriptor> descriptors;
+ protected Set<Relationship> relationships;
+ protected volatile int port;
+ protected volatile Charset charset;
+ protected volatile BlockingQueue<ByteArrayMessage> events;
+ protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+ protected volatile InetAddress hostname;
+ protected volatile EventServer eventServer;
+ protected volatile byte[] messageDemarcatorBytes;
+ protected volatile EventBatcher eventBatcher;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ListenerProperties.PORT);
+ descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+ descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
Review comment:
This property does not appear to be used, adding a comment above would
be helpful.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Messages received successfully will be sent out this
relationship.")
+ .build();
+
+ protected static final String RECEIVED_COUNTER = "Messages Received";
+
+ protected List<PropertyDescriptor> descriptors;
+ protected Set<Relationship> relationships;
+ protected volatile int port;
+ protected volatile Charset charset;
+ protected volatile BlockingQueue<ByteArrayMessage> events;
+ protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+ protected volatile InetAddress hostname;
+ protected volatile EventServer eventServer;
+ protected volatile byte[] messageDemarcatorBytes;
+ protected volatile EventBatcher eventBatcher;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ListenerProperties.PORT);
+ descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+ descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.CHARSET);
+ descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+ descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+ descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+ descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+ descriptors.add(POOL_RECV_BUFFERS);
Review comment:
These two properties no longer appear to be used, but they probably need
to remain in place for backward compatibility. Adding a comment would be
helpful.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -153,56 +268,51 @@
return results;
}
- @Override
- protected ChannelDispatcher createDispatcher(final ProcessContext context,
final BlockingQueue<StandardEvent> events)
- throws IOException {
-
- final int maxConnections =
context.getProperty(MAX_CONNECTIONS).asInteger();
- final int maxThreadPoolSize =
context.getProperty(MAX_RECV_THREAD_POOL_SIZE).isSet()
- ? context.getProperty(MAX_RECV_THREAD_POOL_SIZE).asInteger()
- : maxConnections;
-
- final int bufferSize =
context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
- final Charset charSet =
Charset.forName(context.getProperty(CHARSET).getValue());
-
- // initialize the buffer pool based on max number of connections and
the buffer size
- final ByteBufferSource byteBufferSource =
context.getProperty(POOL_RECV_BUFFERS).asBoolean()
- ? new ByteBufferPool(maxConnections, bufferSize)
- : new ByteBufferFactory(bufferSize);
-
- // if an SSLContextService was provided then create an SSLContext to
pass down to the dispatcher
- SSLContext sslContext = null;
- ClientAuth clientAuth = null;
-
- final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
- if (sslContextService != null) {
- final String clientAuthValue =
context.getProperty(CLIENT_AUTH).getValue();
- sslContext = sslContextService.createContext();
- clientAuth = ClientAuth.valueOf(clientAuthValue);
- }
-
- final EventFactory<StandardEvent> eventFactory = new
StandardEventFactory();
- final ChannelHandlerFactory<StandardEvent<SocketChannel>,
AsyncChannelDispatcher> handlerFactory = new SocketChannelHandlerFactory<>();
- return new SocketChannelDispatcher(eventFactory, handlerFactory,
byteBufferSource, events, getLogger(), maxConnections,
- maxThreadPoolSize, sslContext, clientAuth, charSet);
- }
-
- @Override
protected Map<String, String> getAttributes(final FlowFileEventBatch
batch) {
- final String sender = batch.getEvents().get(0).getSender();
+ final List<ByteArrayMessage> events = batch.getEvents();
+ final String sender = events.get(0).getSender();
final Map<String,String> attributes = new HashMap<>(3);
attributes.put("tcp.sender", sender);
attributes.put("tcp.port", String.valueOf(port));
return attributes;
}
- @Override
protected String getTransitUri(FlowFileEventBatch batch) {
- final String sender = batch.getEvents().get(0).getSender();
+ final List<ByteArrayMessage> events = batch.getEvents();
+ final String sender = events.get(0).getSender();
final String senderHost = sender.startsWith("/") && sender.length() >
1 ? sender.substring(1) : sender;
final String transitUri = new
StringBuilder().append("tcp").append("://").append(senderHost).append(":")
.append(port).toString();
return transitUri;
}
-}
+ @Override
+ public final Set<Relationship> getRelationships() {
+ return this.relationships;
+ }
+
+ @Override
+ public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return descriptors;
+ }
+
+ private String getMessageDemarcator(final ProcessContext context) {
+ return context.getProperty(ListenerProperties.MESSAGE_DELIMITER)
+ .getValue()
+ .replace("\\n", "\n").replace("\\r", "\r").replace("\\t",
"\t");
+ }
+
+ private EventBatcher getEventBatcher() {
+ if(eventBatcher != null) {
Review comment:
Recommend correcting the spacing:
```suggestion
if (eventBatcher != null) {
```
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Messages received successfully will be sent out this
relationship.")
+ .build();
+
+ protected static final String RECEIVED_COUNTER = "Messages Received";
+
+ protected List<PropertyDescriptor> descriptors;
+ protected Set<Relationship> relationships;
+ protected volatile int port;
+ protected volatile Charset charset;
+ protected volatile BlockingQueue<ByteArrayMessage> events;
+ protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+ protected volatile InetAddress hostname;
+ protected volatile EventServer eventServer;
+ protected volatile byte[] messageDemarcatorBytes;
+ protected volatile EventBatcher eventBatcher;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ListenerProperties.PORT);
+ descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+ descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.CHARSET);
+ descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+ descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+ descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+ descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+ descriptors.add(POOL_RECV_BUFFERS);
+ descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+ descriptors.add(CLIENT_AUTH);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws IOException {
+ int maxConnections =
context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+ int bufferSize =
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final String networkInterface =
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+ InetAddress hostname =
NetworkUtils.getInterfaceAddress(networkInterface);
+ Charset charset =
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+ port =
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+ events = new
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+ errorEvents = new LinkedBlockingQueue<>();
+ final String msgDemarcator = getMessageDemarcator(context);
+ messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+ final NettyEventServerFactory eventFactory = new
ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port,
TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+
+ final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ if (sslContextService != null) {
+ final String clientAuthValue =
context.getProperty(CLIENT_AUTH).getValue();
+ ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+ SSLContext sslContext = sslContextService.createContext();
+ if (sslContext != null) {
+ eventFactory.setSslContext(sslContext);
+ eventFactory.setClientAuth(clientAuth);
+ }
+ }
+
+ eventFactory.setSocketReceiveBuffer(bufferSize);
+ eventFactory.setWorkerThreads(maxConnections);
+
+ try {
+ eventServer = eventFactory.getEventServer();
+ } catch (EventException e) {
+ getLogger().error("Failed to bind to [{}:{}].",
hostname.getHostAddress(), port);
+ }
+ }
+
@Override
- protected List<PropertyDescriptor> getAdditionalProperties() {
- return Arrays.asList(
- MAX_CONNECTIONS,
- MAX_RECV_THREAD_POOL_SIZE,
- POOL_RECV_BUFFERS,
- SSL_CONTEXT_SERVICE,
- CLIENT_AUTH
- );
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final int batchSize =
context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
+ Map<String, FlowFileEventBatch> batches =
getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
+ processEvents(session, batches);
+ }
+
+ private void processEvents(final ProcessSession session, final Map<String,
FlowFileEventBatch> batches) {
+ for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet())
{
+ FlowFile flowFile = entry.getValue().getFlowFile();
+ final List<ByteArrayMessage> events = entry.getValue().getEvents();
+
+ if (flowFile.getSize() == 0L || events.size() == 0) {
+ session.remove(flowFile);
+ getLogger().debug("No data written to FlowFile from batch {};
removing FlowFile", entry.getKey());
+ continue;
+ }
+
+ final Map<String,String> attributes =
getAttributes(entry.getValue());
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ getLogger().debug("Transferring {} to success", flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ session.adjustCounter("FlowFiles Transferred to Success", 1L,
false);
+
+ // the sender and command will be the same for all events based on
the batch key
+ final String transitUri = getTransitUri(entry.getValue());
+ session.getProvenanceReporter().receive(flowFile, transitUri);
+
+ }
+ session.commitAsync();
Review comment:
Is this call necessary? It seems like the standard AbstractProcessor
handling of onTrigger() should be sufficient.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Messages received successfully will be sent out this
relationship.")
+ .build();
+
+ protected static final String RECEIVED_COUNTER = "Messages Received";
+
+ protected List<PropertyDescriptor> descriptors;
+ protected Set<Relationship> relationships;
+ protected volatile int port;
+ protected volatile Charset charset;
+ protected volatile BlockingQueue<ByteArrayMessage> events;
+ protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+ protected volatile InetAddress hostname;
Review comment:
Is this property used?
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenTCP.java
##########
@@ -112,15 +121,121 @@
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
+ public static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Messages received successfully will be sent out this
relationship.")
+ .build();
+
+ protected static final String RECEIVED_COUNTER = "Messages Received";
+
+ protected List<PropertyDescriptor> descriptors;
+ protected Set<Relationship> relationships;
+ protected volatile int port;
+ protected volatile Charset charset;
+ protected volatile BlockingQueue<ByteArrayMessage> events;
+ protected volatile BlockingQueue<ByteArrayMessage> errorEvents;
+ protected volatile InetAddress hostname;
+ protected volatile EventServer eventServer;
+ protected volatile byte[] messageDemarcatorBytes;
+ protected volatile EventBatcher eventBatcher;
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(ListenerProperties.PORT);
+ descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+ descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+ descriptors.add(ListenerProperties.CHARSET);
+ descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+ descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+ descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+ descriptors.add(MAX_RECV_THREAD_POOL_SIZE);
+ descriptors.add(POOL_RECV_BUFFERS);
+ descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+ descriptors.add(CLIENT_AUTH);
+ descriptors.add(SSL_CONTEXT_SERVICE);
+ this.descriptors = Collections.unmodifiableList(descriptors);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ this.relationships = Collections.unmodifiableSet(relationships);
+ }
+
+ @OnScheduled
+ public void onScheduled(ProcessContext context) throws IOException {
+ int maxConnections =
context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+ int bufferSize =
context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+ final String networkInterface =
context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+ InetAddress hostname =
NetworkUtils.getInterfaceAddress(networkInterface);
+ Charset charset =
Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+ port =
context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+ events = new
LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+ errorEvents = new LinkedBlockingQueue<>();
+ final String msgDemarcator = getMessageDemarcator(context);
+ messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+ final NettyEventServerFactory eventFactory = new
ByteArrayMessageNettyEventServerFactory(getLogger(), hostname, port,
TransportProtocol.TCP, messageDemarcatorBytes, bufferSize, events);
+
+ final SSLContextService sslContextService =
context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ if (sslContextService != null) {
+ final String clientAuthValue =
context.getProperty(CLIENT_AUTH).getValue();
+ ClientAuth clientAuth = ClientAuth.valueOf(clientAuthValue);
+ SSLContext sslContext = sslContextService.createContext();
+ if (sslContext != null) {
+ eventFactory.setSslContext(sslContext);
+ eventFactory.setClientAuth(clientAuth);
+ }
+ }
+
+ eventFactory.setSocketReceiveBuffer(bufferSize);
+ eventFactory.setWorkerThreads(maxConnections);
+
+ try {
+ eventServer = eventFactory.getEventServer();
+ } catch (EventException e) {
+ getLogger().error("Failed to bind to [{}:{}].",
hostname.getHostAddress(), port);
+ }
+ }
+
@Override
- protected List<PropertyDescriptor> getAdditionalProperties() {
- return Arrays.asList(
- MAX_CONNECTIONS,
- MAX_RECV_THREAD_POOL_SIZE,
- POOL_RECV_BUFFERS,
- SSL_CONTEXT_SERVICE,
- CLIENT_AUTH
- );
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
+ final int batchSize =
context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
+ Map<String, FlowFileEventBatch> batches =
getEventBatcher().getBatches(session, batchSize, messageDemarcatorBytes);
+ processEvents(session, batches);
+ }
+
+ private void processEvents(final ProcessSession session, final Map<String,
FlowFileEventBatch> batches) {
+ for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet())
{
+ FlowFile flowFile = entry.getValue().getFlowFile();
+ final List<ByteArrayMessage> events = entry.getValue().getEvents();
+
+ if (flowFile.getSize() == 0L || events.size() == 0) {
+ session.remove(flowFile);
+ getLogger().debug("No data written to FlowFile from batch {};
removing FlowFile", entry.getKey());
+ continue;
+ }
+
+ final Map<String,String> attributes =
getAttributes(entry.getValue());
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ getLogger().debug("Transferring {} to success", flowFile);
+ session.transfer(flowFile, REL_SUCCESS);
+ session.adjustCounter("FlowFiles Transferred to Success", 1L,
false);
+
+ // the sender and command will be the same for all events based on
the batch key
Review comment:
This comment is unclear given the following lines, is it necessary?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]