http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java
deleted file mode 100644
index 07b5dcc..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.handler.socket;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
-
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Base class for socket channel handlers.
- */
-public abstract class SocketChannelHandler<E extends Event<SocketChannel>> 
extends ChannelHandler<E, AsyncChannelDispatcher> {
-
-    static final byte TCP_DELIMITER = '\n';
-
-    public SocketChannelHandler(final SelectionKey key,
-                                final AsyncChannelDispatcher dispatcher,
-                                final Charset charset,
-                                final EventFactory<E> eventFactory,
-                                final BlockingQueue<E> events,
-                                final ComponentLog logger) {
-        super(key, dispatcher, charset, eventFactory, events, logger);
-    }
-
-    /**
-     * @return the byte used as the delimiter between messages for the given 
handler
-     */
-    public abstract byte getDelimiter();
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java
deleted file mode 100644
index 9003f90..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.handler.socket;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Default factory for creating socket channel handlers.
- */
-public class SocketChannelHandlerFactory<E extends Event<SocketChannel>> 
implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
-
-    @Override
-    public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final 
SelectionKey key,
-                                           final AsyncChannelDispatcher 
dispatcher,
-                                           final Charset charset,
-                                           final EventFactory<E> eventFactory,
-                                           final BlockingQueue<E> events,
-                                           final ComponentLog logger) {
-        return new StandardSocketChannelHandler<>(key, dispatcher, charset, 
eventFactory, events, logger);
-    }
-
-    @Override
-    public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final 
SelectionKey key,
-                                              final AsyncChannelDispatcher 
dispatcher,
-                                              final Charset charset,
-                                              final EventFactory<E> 
eventFactory,
-                                              final BlockingQueue<E> events,
-                                              final ComponentLog logger) {
-        return new SSLSocketChannelHandler<>(key, dispatcher, charset, 
eventFactory, events, logger);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
deleted file mode 100644
index 250168c..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.handler.socket;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import 
org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
-import 
org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Reads from the given SocketChannel into the provided buffer. If the given 
delimiter is found, the data
- * read up to that point is queued for processing.
- */
-public class StandardSocketChannelHandler<E extends Event<SocketChannel>> 
extends SocketChannelHandler<E> {
-
-    private final ByteArrayOutputStream currBytes = new 
ByteArrayOutputStream(4096);
-
-    public StandardSocketChannelHandler(final SelectionKey key,
-                                        final AsyncChannelDispatcher 
dispatcher,
-                                        final Charset charset,
-                                        final EventFactory<E> eventFactory,
-                                        final BlockingQueue<E> events,
-                                        final ComponentLog logger) {
-        super(key, dispatcher, charset, eventFactory, events, logger);
-    }
-
-    @Override
-    public void run() {
-        boolean eof = false;
-        SocketChannel socketChannel = null;
-
-        try {
-            int bytesRead;
-            socketChannel = (SocketChannel) key.channel();
-
-            final SocketChannelAttachment attachment = 
(SocketChannelAttachment) key.attachment();
-            final ByteBuffer socketBuffer = attachment.getByteBuffer();
-
-            // read until the buffer is full
-            while ((bytesRead = socketChannel.read(socketBuffer)) > 0) {
-                // prepare byte buffer for reading
-                socketBuffer.flip();
-                // mark the current position as start, in case of partial 
message read
-                socketBuffer.mark();
-                // process the contents that have been read into the buffer
-                processBuffer(socketChannel, socketBuffer);
-
-                // Preserve bytes in buffer for next call to run
-                // NOTE: This code could benefit from the  two ByteBuffer read 
calls to avoid
-                // this compact for higher throughput
-                socketBuffer.reset();
-                socketBuffer.compact();
-                logger.debug("bytes read {}", new Object[]{bytesRead});
-            }
-
-            // Check for closed socket
-            if( bytesRead < 0 ){
-                eof = true;
-                logger.debug("Reached EOF, closing connection");
-            } else {
-                logger.debug("No more data available, returning for 
selection");
-            }
-        } catch (ClosedByInterruptException | InterruptedException e) {
-            logger.debug("read loop interrupted, closing connection");
-            // Treat same as closed socket
-            eof = true;
-        } catch (ClosedChannelException e) {
-            // ClosedChannelException doesn't have a message so handle it 
separately from IOException
-            logger.error("Error reading from channel due to channel being 
closed", e);
-            // Treat same as closed socket
-            eof = true;
-        } catch (IOException e) {
-            logger.error("Error reading from channel due to {}", new Object[] 
{e.getMessage()}, e);
-            // Treat same as closed socket
-            eof = true;
-        } finally {
-            if(eof == true) {
-                IOUtils.closeQuietly(socketChannel);
-                dispatcher.completeConnection(key);
-            } else {
-                dispatcher.addBackForSelection(key);
-            }
-        }
-    }
-
-    /**
-     * Process the contents that have been read into the buffer. Allow 
sub-classes to override this behavior.
-     *
-     * @param socketChannel the channel the data was read from
-     * @param socketBuffer the buffer the data was read into
-     * @throws InterruptedException if interrupted when queuing events
-     */
-    protected void processBuffer(final SocketChannel socketChannel, final 
ByteBuffer socketBuffer) throws InterruptedException, IOException {
-        // get total bytes in buffer
-        final int total = socketBuffer.remaining();
-        final InetAddress sender = socketChannel.socket().getInetAddress();
-
-        // go through the buffer looking for the end of each message
-        currBytes.reset();
-        for (int i = 0; i < total; i++) {
-            // NOTE: For higher throughput, the looking for \n and copying 
into the byte stream could be improved
-            // Pull data out of buffer and cram into byte array
-            byte currByte = socketBuffer.get();
-
-            // check if at end of a message
-            if (currByte == getDelimiter()) {
-                if (currBytes.size() > 0) {
-                    final SocketChannelResponder response = new 
SocketChannelResponder(socketChannel);
-                    final Map<String, String> metadata = 
EventFactoryUtil.createMapWithSender(sender.toString());
-                    final E event = 
eventFactory.create(currBytes.toByteArray(), metadata, response);
-                    events.offer(event);
-                    currBytes.reset();
-
-                    // Mark this as the start of the next message
-                    socketBuffer.mark();
-                }
-            } else {
-                currBytes.write(currByte);
-            }
-        }
-    }
-
-    @Override
-    public byte getDelimiter() {
-        return TCP_DELIMITER;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java
deleted file mode 100644
index 978f3ac..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.response;
-
-import java.io.IOException;
-import java.nio.channels.SelectableChannel;
-import java.util.List;
-
-/**
- * A responder for a given channel.
- *
- * @param <C> The type of SelectableChannel where the response will be written.
- */
-public interface ChannelResponder<C extends SelectableChannel> {
-
-    /**
-     * @return a SelectableChannel to write the response to
-     */
-    C getChannel();
-
-    /**
-     * @return a list of responses to write to the channel
-     */
-    List<ChannelResponse> getResponses();
-
-    /**
-     * @param response adds the given response to the list of responses
-     */
-    void addResponse(ChannelResponse response);
-
-    /**
-     * Writes the responses to the underlying channel.
-     */
-    void respond() throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java
deleted file mode 100644
index 98f0301..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.response;
-
-/**
- * A response to send back over channel.
- */
-public interface ChannelResponse {
-
-    /**
-     * @return the bytes that should be written to a channel for this response
-     */
-    byte[] toByteArray();
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
deleted file mode 100644
index 20102ba..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.response.socket;
-
-import org.apache.nifi.processor.util.listen.response.ChannelResponse;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-
-import java.io.IOException;
-import java.nio.channels.SocketChannel;
-
-/**
- * A ChannelResponder for SSLSocketChannels.
- */
-public class SSLSocketChannelResponder extends SocketChannelResponder {
-
-    private SSLSocketChannel sslSocketChannel;
-
-    public SSLSocketChannelResponder(final SocketChannel socketChannel, final 
SSLSocketChannel sslSocketChannel) {
-        super(socketChannel);
-        this.sslSocketChannel = sslSocketChannel;
-    }
-
-    @Override
-    public void respond() throws IOException {
-        for (final ChannelResponse response : responses) {
-            sslSocketChannel.write(response.toByteArray());
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
deleted file mode 100644
index 5c20bf0..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.listen.response.socket;
-
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processor.util.listen.response.ChannelResponse;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * A ChannelResponder for SocketChannels. The SocketChannel should first be 
registered with a selector,
- * upon being selected for writing the respond() method should be executed.
- */
-public class SocketChannelResponder implements ChannelResponder<SocketChannel> 
{
-
-    protected final List<ChannelResponse> responses;
-    protected final SocketChannel socketChannel;
-
-    public SocketChannelResponder(final SocketChannel socketChannel) {
-        this.responses = new ArrayList<>();
-        this.socketChannel = socketChannel;
-    }
-
-    @Override
-    public SocketChannel getChannel() {
-        return socketChannel;
-    }
-
-    @Override
-    public List<ChannelResponse> getResponses() {
-        return Collections.unmodifiableList(responses);
-    }
-
-    @Override
-    public void addResponse(ChannelResponse response) {
-        this.responses.add(response);
-    }
-
-    @Override
-    public void respond() throws IOException {
-        for (final ChannelResponse response : responses) {
-            final ByteBuffer responseBuffer = 
ByteBuffer.wrap(response.toByteArray());
-
-            while (responseBuffer.hasRemaining()) {
-                socketChannel.write(responseBuffer);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
deleted file mode 100644
index f97f31d..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-/**
- * Represents a looping process was discontinued.
- * When a method throws this exception, its caller should stop processing 
further inputs and stop immediately.
- */
-public class DiscontinuedException extends RuntimeException {
-    public DiscontinuedException(String message) {
-        super(message);
-    }
-
-    public DiscontinuedException(String message, Throwable cause) {
-        super(message, cause);
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
deleted file mode 100644
index c6cf140..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure;
-import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.ProcessException;
-import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Retry;
-import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Self;
-import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.None;
-import static 
org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Penalize;
-import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Yield;
-
-/**
- * Represents general error types and how it should be treated.
- */
-public enum ErrorTypes {
-
-    /**
-     * Procedure setting has to be fixed, otherwise the same error would occur 
irrelevant to the input.
-     * In order to NOT call failing process frequently, this should be yielded.
-     */
-    PersistentFailure(ProcessException, Yield),
-
-    /**
-     * It is unknown whether the error is persistent or temporal, related to 
the input or not.
-     */
-    UnknownFailure(ProcessException, None),
-
-    /**
-     * The input will be sent to the failure route for recovery without 
penalizing.
-     * Basically, the input should not be sent to the same procedure again 
unless the issue has been solved.
-     */
-    InvalidInput(Failure, None),
-
-    /**
-     * The procedure is temporarily unavailable, usually due to the external 
service unavailability.
-     * Retrying maybe successful, but it should be yielded for a while.
-     */
-    TemporalFailure(Retry, Yield),
-
-    /**
-     * The input was not processed successfully due to some temporal error
-     * related to the specifics of the input. Retrying maybe successful,
-     * but it should be penalized for a while.
-     */
-    TemporalInputFailure(Retry, Penalize),
-
-    /**
-     * The input was not ready for being processed. It will be kept in the 
incoming queue and also be penalized.
-     */
-    Defer(Self, Penalize);
-
-    private final Destination destination;
-    private final Penalty penalty;
-    ErrorTypes(Destination destination, Penalty penalty){
-        this.destination = destination;
-        this.penalty = penalty;
-    }
-
-    public Result result() {
-        return new Result(destination, penalty);
-    }
-
-    /**
-     * Represents the destination of input.
-     */
-    public enum Destination {
-        ProcessException, Failure, Retry, Self
-    }
-
-    /**
-     * Indicating yield or penalize the processing when transfer the input.
-     */
-    public enum Penalty {
-        Yield, Penalize, None
-    }
-
-    public Destination destination(){
-        return this.destination;
-    }
-
-    public Penalty penalty(){
-        return this.penalty;
-    }
-
-    /**
-     * Result represents a result of a procedure.
-     * ErrorTypes enum contains basic error result patterns.
-     */
-    public static class Result {
-        private final Destination destination;
-        private final Penalty penalty;
-
-        public Result(Destination destination, Penalty penalty) {
-            this.destination = destination;
-            this.penalty = penalty;
-        }
-
-        public Destination destination() {
-            return destination;
-        }
-
-        public Penalty penalty() {
-            return penalty;
-        }
-
-        @Override
-        public String toString() {
-            return "Result{" +
-                    "destination=" + destination +
-                    ", penalty=" + penalty +
-                    '}';
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            Result result = (Result) o;
-
-            if (destination != result.destination) return false;
-            return penalty == result.penalty;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = destination != null ? destination.hashCode() : 0;
-            result = 31 * result + (penalty != null ? penalty.hashCode() : 0);
-            return result;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
deleted file mode 100644
index bd1c9eb..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.pattern.ErrorTypes.Result;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
-/**
- * <p>ExceptionHandler provides a structured Exception handling logic composed 
by reusable partial functions.
- *
- * <p>
- *     Benefits of using ExceptionHandler:
- *     <li>Externalized error handling code which provides cleaner program 
only focusing on the expected path.</li>
- *     <li>Classify specific Exceptions into {@link ErrorTypes}, consolidated 
error handling based on error type.</li>
- *     <li>Context aware error handling, {@link RollbackOnFailure} for 
instance.</li>
- * </p>
- */
-public class ExceptionHandler<C> {
-
-    @FunctionalInterface
-    public interface Procedure<I> {
-        void apply(I input) throws Exception;
-    }
-
-    public interface OnError<C, I> {
-        void apply(C context, I input, Result result, Exception e);
-
-        default OnError<C, I> andThen(OnError<C, I> after) {
-            return (c, i, r, e) -> {
-                apply(c, i, r, e);
-                after.apply(c, i, r, e);
-            };
-        }
-    }
-
-    /**
-     * Simply categorise an Exception.
-     */
-    private Function<Exception, ErrorTypes> mapException;
-
-    /**
-     * Adjust error type based on the context.
-     */
-    private BiFunction<C, ErrorTypes, Result> adjustError;
-
-    /**
-     * Do some action to the input based on the final error type.
-     */
-    private OnError<C, ?> onError;
-
-    /**
-     * Specify a function that maps an Exception to certain ErrorType.
-     */
-    public void mapException(Function<Exception, ErrorTypes> mapException) {
-        this.mapException = mapException;
-    }
-
-    /**
-     * <p>Specify a function that adjust ErrorType based on a function context.
-     * <p>For example, {@link 
RollbackOnFailure#createAdjustError(ComponentLog)} decides
-     * whether a process session should rollback or transfer input to failure 
or retry.
-     */
-    public void adjustError(BiFunction<C, ErrorTypes, Result> adjustError) {
-        this.adjustError = adjustError;
-    }
-
-    /**
-     * <p>Specify a default OnError function that will be called if one is not 
explicitly specified when {@link #execute(Object, Object, Procedure)} is called.
-     */
-    public void onError(OnError<C, ?> onError) {
-        this.onError = onError;
-    }
-
-    /**
-     * <p>Executes specified procedure function with the input.
-     * <p>Default OnError function will be called when an exception is thrown.
-     * @param context function context
-     * @param input input for procedure
-     * @param procedure a function that does something with the input
-     * @return True if the procedure finished without issue. False if 
procedure threw an Exception but it was handled by {@link OnError}.
-     * @throws ProcessException Thrown if the exception was not handled by 
{@link OnError}
-     * @throws DiscontinuedException Indicating the exception was handled by 
{@link OnError} but process should stop immediately
-     * without processing any further input
-     */
-    @SuppressWarnings("unchecked")
-    public <I> boolean execute(C context, I input, Procedure<I> procedure) 
throws ProcessException, DiscontinuedException {
-        return execute(context, input, procedure, (OnError<C, I>) onError);
-    }
-
-    /**
-     * <p>Executes specified procedure function with the input.
-     * @param context function context
-     * @param input input for procedure
-     * @param procedure a function that does something with the input
-     * @param onError specify {@link OnError} function for this execution
-     * @return True if the procedure finished without issue. False if 
procedure threw an Exception but it was handled by {@link OnError}.
-     * @throws ProcessException Thrown if the exception was not handled by 
{@link OnError}
-     * @throws DiscontinuedException Indicating the exception was handled by 
{@link OnError} but process should stop immediately
-     * without processing any further input
-     */
-    public <I> boolean execute(C context, I input, Procedure<I> procedure, 
OnError<C, I> onError) throws ProcessException, DiscontinuedException {
-        try {
-            procedure.apply(input);
-            return true;
-        } catch (Exception e) {
-
-            if (mapException == null) {
-                throw new ProcessException("An exception was thrown: " + e, e);
-            }
-
-            final ErrorTypes type = mapException.apply(e);
-
-            final Result result;
-            if (adjustError != null) {
-                result = adjustError.apply(context, type);
-            } else {
-                result = new Result(type.destination(), type.penalty());
-            }
-
-            if (onError == null) {
-                throw new IllegalStateException("OnError is not set.");
-            }
-
-            onError.apply(context, input, result, e);
-        }
-        return false;
-    }
-
-    private static FlowFile penalize(final ProcessContext context, final 
ProcessSession session,
-                                     final FlowFile flowFile, final 
ErrorTypes.Penalty penalty) {
-        switch (penalty) {
-            case Penalize:
-                return session.penalize(flowFile);
-            case Yield:
-                context.yield();
-        }
-        return flowFile;
-    }
-
-    /**
-     * Create a {@link OnError} function instance that routes input based on 
{@link Result} destination and penalty.
-     * @param context process context is used to yield a processor
-     * @param session process session is used to penalize a FlowFile
-     * @param routingResult input FlowFile will be routed to a destination 
relationship in this {@link RoutingResult}
-     * @param relFailure specify failure relationship of a processor
-     * @param relRetry specify retry relationship of a processor
-     * @return composed function
-     */
-    public static <C> ExceptionHandler.OnError<C, FlowFile> createOnError(
-            final ProcessContext context, final ProcessSession session, final 
RoutingResult routingResult,
-            final Relationship relFailure, final Relationship relRetry) {
-
-        return (fc, input, result, e) -> {
-            final PartialFunctions.FlowFileGroup flowFileGroup = () -> 
Collections.singletonList(input);
-            createOnGroupError(context, session, routingResult, relFailure, 
relRetry).apply(fc, flowFileGroup, result, e);
-        };
-    }
-
-    /**
-     * Same as {@link #createOnError(ProcessContext, ProcessSession, 
RoutingResult, Relationship, Relationship)} for FlowFileGroup.
-     * @param context process context is used to yield a processor
-     * @param session process session is used to penalize FlowFiles
-     * @param routingResult input FlowFiles will be routed to a destination 
relationship in this {@link RoutingResult}
-     * @param relFailure specify failure relationship of a processor
-     * @param relRetry specify retry relationship of a processor
-     * @return composed function
-     */
-    public static <C, I extends PartialFunctions.FlowFileGroup> 
ExceptionHandler.OnError<C, I> createOnGroupError(
-            final ProcessContext context, final ProcessSession session, final 
RoutingResult routingResult,
-            final Relationship relFailure, final Relationship relRetry) {
-        return (c, g, r, e) -> {
-            final Relationship routeTo;
-            switch (r.destination()) {
-                case Failure:
-                    routeTo = relFailure;
-                    break;
-                case Retry:
-                    routeTo = relRetry;
-                    break;
-                case Self:
-                    routeTo = Relationship.SELF;
-                    break;
-                default:
-                    if (e instanceof ProcessException) {
-                        throw (ProcessException)e;
-                    } else {
-                        Object inputs = null;
-                        if (g != null) {
-                            final List<FlowFile> flowFiles = g.getFlowFiles();
-                            switch (flowFiles.size()) {
-                                case 0:
-                                    inputs = "[]";
-                                    break;
-                                case 1:
-                                    inputs = flowFiles.get(0);
-                                    break;
-                                default:
-                                    inputs = String.format("%d FlowFiles 
including %s", flowFiles.size(), flowFiles.get(0));
-                                    break;
-                            }
-                        }
-                        throw new ProcessException(String.format("Failed to 
process %s due to %s", inputs, e), e);
-                    }
-            }
-            for (FlowFile f : g.getFlowFiles()) {
-                final FlowFile maybePenalized = penalize(context, session, f, 
r.penalty());
-                routingResult.routeTo(maybePenalized, routeTo);
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
deleted file mode 100644
index 8332289..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.exception.ProcessException;
-
-import java.util.List;
-
-/**
- * This class contains various partial functions those are reusable among 
process patterns.
- */
-public class PartialFunctions {
-
-    @FunctionalInterface
-    public interface InitConnection<FC, C> {
-        C apply(ProcessContext context, ProcessSession session, FC 
functionContext) throws ProcessException;
-    }
-
-    @FunctionalInterface
-    public interface FetchFlowFiles<FC> {
-        List<FlowFile> apply(ProcessContext context, ProcessSession session, 
FC functionContext, RoutingResult result) throws ProcessException;
-    }
-
-    @FunctionalInterface
-    public interface OnCompleted<FC, C> {
-        void apply(ProcessContext context, ProcessSession session, FC 
functionContext, C connection) throws ProcessException;
-    }
-
-    @FunctionalInterface
-    public interface OnFailed<FC, C> {
-        void apply(ProcessContext context, ProcessSession session, FC 
functionContext, C connection, Exception e) throws ProcessException;
-    }
-
-    @FunctionalInterface
-    public interface Cleanup<FC, C> {
-        void apply(ProcessContext context, ProcessSession session, FC 
functionContext, C connection) throws ProcessException;
-    }
-
-    @FunctionalInterface
-    public interface FlowFileGroup {
-        List<FlowFile> getFlowFiles();
-    }
-
-    @FunctionalInterface
-    public interface AdjustRoute<FC> {
-        void apply(ProcessContext context, ProcessSession session, FC 
functionContext, RoutingResult result) throws ProcessException;
-    }
-
-    @FunctionalInterface
-    public interface TransferFlowFiles<FC> {
-        void apply(ProcessContext context, ProcessSession session, FC 
functionContext, RoutingResult result) throws ProcessException;
-
-        default TransferFlowFiles<FC> andThen(TransferFlowFiles<FC> after) {
-            return (context, session, functionContext, result) -> {
-                apply(context, session, functionContext, result);
-                after.apply(context, session, functionContext, result);
-            };
-        }
-    }
-
-    public static <FCT> PartialFunctions.FetchFlowFiles<FCT> 
fetchSingleFlowFile() {
-        return (context, session, functionContext, result) -> session.get(1);
-    }
-
-    public static <FCT> PartialFunctions.TransferFlowFiles<FCT> 
transferRoutedFlowFiles() {
-        return (context, session, functionContext, result)
-                -> result.getRoutedFlowFiles().forEach(((relationship, 
routedFlowFiles)
-                -> session.transfer(routedFlowFiles, relationship)));
-    }
-
-    @FunctionalInterface
-    public interface OnTrigger {
-        void execute(ProcessSession session) throws ProcessException;
-    }
-
-    @FunctionalInterface
-    public interface RollbackSession {
-        void rollback(ProcessSession session, Throwable t);
-    }
-
-    /**
-     * <p>This method is identical to what {@link 
org.apache.nifi.processor.AbstractProcessor#onTrigger(ProcessContext, 
ProcessSession)} does.</p>
-     * <p>Create a session from ProcessSessionFactory and execute specified 
onTrigger function, and commit the session if onTrigger finishes 
successfully.</p>
-     * <p>When an Exception is thrown during execution of the onTrigger, the 
session will be rollback. FlowFiles being processed will be penalized.</p>
-     */
-    public static void onTrigger(ProcessContext context, ProcessSessionFactory 
sessionFactory, ComponentLog logger, OnTrigger onTrigger) throws 
ProcessException {
-        onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> 
session.rollback(true));
-    }
-
-    public static void onTrigger(
-            ProcessContext context, ProcessSessionFactory sessionFactory, 
ComponentLog logger, OnTrigger onTrigger,
-            RollbackSession rollbackSession) throws ProcessException {
-        final ProcessSession session = sessionFactory.createSession();
-        try {
-            onTrigger.execute(session);
-            session.commit();
-        } catch (final Throwable t) {
-            logger.error("{} failed to process due to {}; rolling back 
session", new Object[]{onTrigger, t});
-            rollbackSession.rollback(session, t);
-            throw t;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
deleted file mode 100644
index 790f48a..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java
+++ /dev/null
@@ -1,228 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-
-import java.util.List;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * Abstract Put pattern class with a generic onTrigger method structure, 
composed with various partial functions.
- * @param <FC> Class of context instance which is passed to each partial 
functions.
- *            Lifetime of an function context should be limited for a single 
onTrigger method.
- * @param <C> Class of connection to a data storage that this pattern puts 
data into.
- */
-public class Put<FC, C extends AutoCloseable> {
-    protected PartialFunctions.InitConnection<FC, C> initConnection;
-    protected PartialFunctions.FetchFlowFiles<FC> fetchFlowFiles = 
PartialFunctions.fetchSingleFlowFile();
-    protected PutFlowFile<FC, C> putFlowFile;
-    protected PartialFunctions.TransferFlowFiles<FC> transferFlowFiles = 
PartialFunctions.transferRoutedFlowFiles();
-    protected PartialFunctions.AdjustRoute<FC> adjustRoute;
-    protected PartialFunctions.OnCompleted<FC, C> onCompleted;
-    protected PartialFunctions.OnFailed<FC, C> onFailed;
-    protected PartialFunctions.Cleanup<FC, C> cleanup;
-    protected ComponentLog logger;
-
-    /**
-     * Put fetched FlowFiles to a data storage.
-     * @param context process context passed from a Processor onTrigger.
-     * @param session process session passed from a Processor onTrigger.
-     * @param functionContext function context passed from a Processor 
onTrigger.
-     * @param connection connection to data storage, established by {@link 
PartialFunctions.InitConnection}.
-     * @param flowFiles FlowFiles fetched from {@link 
PartialFunctions.FetchFlowFiles}.
-     * @param result Route incoming FlowFiles if necessary.
-     */
-    protected void putFlowFiles(ProcessContext context, ProcessSession session,
-                                        FC functionContext, C connection, 
List<FlowFile> flowFiles, RoutingResult result) throws ProcessException {
-        for (FlowFile flowFile : flowFiles) {
-            putFlowFile.apply(context, session, functionContext, connection, 
flowFile, result);
-        }
-    }
-
-    protected void validateCompositePattern() {
-        Objects.requireNonNull(initConnection, "InitConnection function is 
required.");
-        Objects.requireNonNull(putFlowFile, "PutFlowFile function is 
required.");
-        Objects.requireNonNull(transferFlowFiles, "TransferFlowFiles function 
is required.");
-    }
-
-    /**
-     * <p>Processor using this pattern is expected to call this method from 
its onTrigger.
-     * <p>Typical usage would be constructing a process pattern instance at a 
processor method
-     * which is annotated with {@link 
org.apache.nifi.annotation.lifecycle.OnScheduled},
-     * and use pattern.onTrigger from processor.onTrigger.
-     * <p>{@link PartialFunctions.InitConnection} is required at least. In 
addition to any functions required by an implementation class.
-     * @param context process context passed from a Processor onTrigger.
-     * @param session process session passed from a Processor onTrigger.
-     * @param functionContext function context should be instantiated per 
onTrigger call.
-     * @throws ProcessException Each partial function can throw 
ProcessException if onTrigger should stop immediately.
-     */
-    public void onTrigger(ProcessContext context, ProcessSession session, FC 
functionContext) throws ProcessException {
-
-        validateCompositePattern();
-
-        final RoutingResult result = new RoutingResult();
-        final List<FlowFile> flowFiles = fetchFlowFiles.apply(context, 
session, functionContext, result);
-
-        // Transfer FlowFiles if there is any.
-        result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles) ->
-                session.transfer(routedFlowFiles, relationship)));
-
-        if (flowFiles == null || flowFiles.isEmpty()) {
-            logger.debug("No incoming FlowFiles.");
-            return;
-        }
-
-        try (C connection = initConnection.apply(context, session, 
functionContext)) {
-
-            try {
-                // Execute the core function.
-                try {
-                    putFlowFiles(context, session, functionContext, 
connection, flowFiles, result);
-                } catch (DiscontinuedException e) {
-                    // Whether it was an error or semi normal is depends on 
the implementation and reason why it wanted to discontinue.
-                    // So, no logging is needed here.
-                }
-
-                // Extension point to alter routes.
-                if (adjustRoute != null) {
-                    adjustRoute.apply(context, session, functionContext, 
result);
-                }
-
-                // Put fetched, but unprocessed FlowFiles back to self.
-                final List<FlowFile> transferredFlowFiles = 
result.getRoutedFlowFiles().values().stream()
-                        .flatMap(List::stream).collect(Collectors.toList());
-                final List<FlowFile> unprocessedFlowFiles = flowFiles.stream()
-                        .filter(flowFile -> 
!transferredFlowFiles.contains(flowFile)).collect(Collectors.toList());
-                result.routeTo(unprocessedFlowFiles, Relationship.SELF);
-
-                // OnCompleted processing.
-                if (onCompleted != null) {
-                    onCompleted.apply(context, session, functionContext, 
connection);
-                }
-
-                // Transfer FlowFiles.
-                transferFlowFiles.apply(context, session, functionContext, 
result);
-
-            } catch (Exception e) {
-                if (onFailed != null) {
-                    onFailed.apply(context, session, functionContext, 
connection, e);
-                }
-                throw e;
-            } finally {
-                if (cleanup != null) {
-                    cleanup.apply(context, session, functionContext, 
connection);
-                }
-            }
-
-        } catch (ProcessException e) {
-            throw e;
-        } catch (Exception e) {
-            // Throw uncaught exception as RuntimeException so that this 
processor will be yielded.
-            final String msg = String.format("Failed to execute due to %s", e);
-            logger.error(msg, e);
-            throw new RuntimeException(msg, e);
-        }
-
-    }
-
-    /**
-     * Specify an optional function that fetches incoming FlowFIles.
-     * If not specified, single FlowFile is fetched on each onTrigger.
-     * @param f Function to fetch incoming FlowFiles.
-     */
-    public void fetchFlowFiles(PartialFunctions.FetchFlowFiles<FC> f) {
-        fetchFlowFiles = f;
-    }
-
-    /**
-     * Specify a function that establishes a connection to target data storage.
-     * This function will be called when there is valid incoming FlowFiles.
-     * The created connection instance is automatically closed when onTrigger 
is finished.
-     * @param f Function to initiate a connection to a data storage.
-     */
-    public void initConnection(PartialFunctions.InitConnection<FC, C> f) {
-        initConnection = f;
-    }
-
-    /**
-     * Specify a function that puts an incoming FlowFile to target data 
storage.
-     * @param f a function to put a FlowFile to target storage.
-     */
-    public void putFlowFile(PutFlowFile<FC, C> f) {
-        this.putFlowFile = f;
-    }
-
-    /**
-     * Specify an optional function that adjust routed FlowFiles before 
transfer it.
-     * @param f a function to adjust route.
-     */
-    public void adjustRoute(PartialFunctions.AdjustRoute<FC> f) {
-        this.adjustRoute = f;
-    }
-
-    /**
-     * Specify an optional function responsible for transferring routed 
FlowFiles.
-     * If not specified routed FlowFiles are simply transferred to its 
destination by default.
-     * @param f a function to transfer routed FlowFiles.
-     */
-    public void transferFlowFiles(PartialFunctions.TransferFlowFiles<FC> f) {
-        this.transferFlowFiles = f;
-    }
-
-    /**
-     * Specify an optional function which will be called if input FlowFiles 
were successfully put to a target storage.
-     * @param f Function to be called when a put operation finishes 
successfully.
-     */
-    public void onCompleted(PartialFunctions.OnCompleted<FC, C> f) {
-        onCompleted = f;
-    }
-
-    /**
-     * Specify an optional function which will be called if input FlowFiles 
failed being put to a target storage.
-     * @param f Function to be called when a put operation failed.
-     */
-    public void onFailed(PartialFunctions.OnFailed<FC, C> f) {
-        onFailed = f;
-    }
-
-    /**
-     * Specify an optional function which will be called in a finally block.
-     * Typically useful when a special cleanup operation is needed for the 
connection.
-     * @param f Function to be called when a put operation finished regardless 
of whether it succeeded or not.
-     */
-    public void cleanup(PartialFunctions.Cleanup<FC, C> f) {
-        cleanup = f;
-    }
-
-    public void setLogger(ComponentLog logger) {
-        this.logger = logger;
-    }
-
-    @FunctionalInterface
-    public interface PutFlowFile<FC, C> {
-        void apply(ProcessContext context, ProcessSession session, FC 
functionContext, C connection,
-                   FlowFile flowFile, RoutingResult result) throws 
ProcessException;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
deleted file mode 100644
index 6e9da2e..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.exception.ProcessException;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Extended Put pattern capable of handling FlowFile groups.
- * @param <FC> Function context class.
- * @param <C> Connection class.
- * @param <FFG> FlowFileGroup class.
- */
-public class PutGroup<FC, C extends AutoCloseable, FFG extends 
PartialFunctions.FlowFileGroup> extends Put<FC, C> {
-
-
-    public PutGroup() {
-        // Just to make a composition valid.
-        this.putFlowFile = (context, session, functionContext, connection, 
inputFlowFile, result) -> {
-            throw new UnsupportedOperationException();
-        };
-    }
-
-    @FunctionalInterface
-    public interface PutFlowFiles<FC, C, FFG> {
-        void apply(ProcessContext context, ProcessSession session, FC 
functionContext, C connection,
-                            FFG inputFlowFileGroup, RoutingResult result) 
throws ProcessException;
-    }
-
-    @Override
-    protected void validateCompositePattern() {
-        super.validateCompositePattern();
-        Objects.requireNonNull(groupFlowFiles, "GroupFlowFiles function is 
required.");
-    }
-
-    /**
-     * PutGroup does not support PutFileFile function for single FlowFile.
-     * Throws UnsupportedOperationException if called.
-     */
-    @Override
-    public void putFlowFile(PutFlowFile<FC, C> putFlowFile) {
-        throw new UnsupportedOperationException("PutFlowFile can not be used 
with PutGroup pattern. Specify PutFlowFiles instead.");
-    }
-
-    @FunctionalInterface
-    public interface GroupFlowFiles<FC, C, FFG> {
-        List<FFG> apply(ProcessContext context, ProcessSession session, FC 
functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) 
throws ProcessException;
-    }
-
-    private GroupFlowFiles<FC, C, FFG> groupFlowFiles;
-    private PutFlowFiles<FC, C, FFG> putFlowFiles;
-
-    /**
-     * Specify a function that groups input FlowFiles into FlowFile groups.
-     */
-    public void groupFetchedFlowFiles(GroupFlowFiles<FC, C, FFG> f) {
-        groupFlowFiles = f;
-    }
-
-    /**
-     * Specify a function that puts an input FlowFile group to a target 
storage using a given connection.
-     */
-    public void putFlowFiles(PutFlowFiles<FC, C, FFG> f) {
-        putFlowFiles = f;
-    }
-
-
-    @Override
-    protected void putFlowFiles(ProcessContext context, ProcessSession 
session, FC functionContext,
-                               C connection, List<FlowFile> flowFiles, 
RoutingResult result) throws ProcessException {
-        final List<FFG> flowFileGroups = groupFlowFiles
-                .apply(context, session, functionContext, connection, 
flowFiles, result);
-
-        for (FFG group : flowFileGroups) {
-            putFlowFiles.apply(context, session, functionContext, connection, 
group, result);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
deleted file mode 100644
index 2d4d768..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processor.util.pattern.PartialFunctions.AdjustRoute;
-
-import java.util.List;
-import java.util.Map;
-import java.util.function.BiFunction;
-
-/**
- * <p>RollbackOnFailure can be used as a function context for process patterns 
such as {@link Put} to provide a configurable error handling.
- *
- * <p>
- *     RollbackOnFailure can add following characteristics to a processor:
- *     <li>When disabled, input FlowFiles caused an error will be routed to 
'failure' or 'retry' relationship, based on the type of error.</li>
- *     <li>When enabled, input FlowFiles are kept in the input queue. A 
ProcessException is thrown to rollback the process session.</li>
- *     <li>It assumes anything happened during a processors onTrigger can 
rollback, if this is marked as transactional.</li>
- *     <li>If transactional and enabled, even if some FlowFiles are already 
processed, it rollbacks the session when error occurs.</li>
- *     <li>If not transactional and enabled, it only rollbacks the session 
when error occurs only if there was no progress.</li>
- * </p>
- *
- * <p>There are two approaches to apply RollbackOnFailure. One is using {@link 
ExceptionHandler#adjustError(BiFunction)},
- * and the other is implementing processor onTrigger using process patterns 
such as {@link Put#adjustRoute(AdjustRoute)}. </p>
- *
- * <p>It's also possible to use both approaches. ExceptionHandler can apply 
when an Exception is thrown immediately, while AdjustRoute respond later but 
requires less code.</p>
- */
-public class RollbackOnFailure {
-
-    private final boolean rollbackOnFailure;
-    private final boolean transactional;
-    private boolean discontinue;
-
-    private int processedCount = 0;
-
-    /**
-     * Constructor.
-     * @param rollbackOnFailure Should be set by user via processor 
configuration.
-     * @param transactional Specify whether a processor is transactional.
-     *                      If not, it is important to call {@link #proceed()} 
after successful execution of processors task,
-     *                      that indicates processor made an operation that 
can not be undone.
-     */
-    public RollbackOnFailure(boolean rollbackOnFailure, boolean transactional) 
{
-        this.rollbackOnFailure = rollbackOnFailure;
-        this.transactional = transactional;
-    }
-
-    public static final PropertyDescriptor ROLLBACK_ON_FAILURE = 
createRollbackOnFailureProperty("");
-
-    public static  PropertyDescriptor createRollbackOnFailureProperty(String 
additionalDescription) {
-        return new PropertyDescriptor.Builder()
-                .name("rollback-on-failure")
-                .displayName("Rollback On Failure")
-                .description("Specify how to handle error." +
-                        " By default (false), if an error occurs while 
processing a FlowFile, the FlowFile will be routed to" +
-                        " 'failure' or 'retry' relationship based on error 
type, and processor can continue with next FlowFile." +
-                        " Instead, you may want to rollback currently 
processed FlowFiles and stop further processing immediately." +
-                        " In that case, you can do so by enabling this 
'Rollback On Failure' property. " +
-                        " If enabled, failed FlowFiles will stay in the input 
relationship without penalizing it and being processed repeatedly" +
-                        " until it gets processed successfully or removed by 
other means." +
-                        " It is important to set adequate 'Yield Duration' to 
avoid retrying too frequently." + additionalDescription)
-                .allowableValues("true", "false")
-                .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
-                .defaultValue("false")
-                .required(true)
-                .build();
-    }
-
-    /**
-     * Create a function to use with {@link ExceptionHandler} that adjust 
error type based on functional context.
-     */
-    public static <FCT extends RollbackOnFailure> BiFunction<FCT, ErrorTypes, 
ErrorTypes.Result> createAdjustError(final ComponentLog logger) {
-        return (c, t) -> {
-
-            ErrorTypes.Result adjusted = null;
-            switch (t.destination()) {
-
-                case ProcessException:
-                    // If this process can rollback, then rollback it.
-                    if (!c.canRollback()) {
-                        // If an exception is thrown but the processor is not 
transactional and processed count > 0, adjust it to self,
-                        // in order to stop any further processing until this 
input is processed successfully.
-                        // If we throw an Exception in this state, the already 
succeeded FlowFiles will be rolled back, too.
-                        // In case the progress was made by other preceding 
inputs,
-                        // those successful inputs should be sent to 'success' 
and this input stays in incoming queue.
-                        // In case this input made some progress to external 
system, the partial update will be replayed again,
-                        // can cause duplicated data.
-                        c.discontinue();
-                        // We should not penalize a FlowFile, if we did, other 
FlowFiles can be fetched first.
-                        // We need to block others to be processed until this 
one finishes.
-                        adjusted = new 
ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
-                    }
-                    break;
-
-                case Failure:
-                case Retry:
-                    if (c.isRollbackOnFailure()) {
-                        c.discontinue();
-                        if (c.canRollback()) {
-                            // If this process can rollback, then throw 
ProcessException instead, in order to rollback.
-                            adjusted = new 
ErrorTypes.Result(ErrorTypes.Destination.ProcessException, 
ErrorTypes.Penalty.Yield);
-                        } else {
-                            // If not,
-                            adjusted = new 
ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield);
-                        }
-                    }
-                    break;
-            }
-
-            if (adjusted != null) {
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Adjusted {} to {} based on context 
rollbackOnFailure={}, processedCount={}, transactional={}",
-                            new Object[]{t, adjusted, c.isRollbackOnFailure(), 
c.getProcessedCount(), c.isTransactional()});
-                }
-                return adjusted;
-            }
-
-            return t.result();
-        };
-    }
-
-    /**
-     * Create an {@link AdjustRoute} function to use with process pattern such 
as {@link Put} that adjust routed FlowFiles based on context.
-     * This function works as a safety net by covering cases that Processor 
implementation did not use ExceptionHandler and transfer FlowFiles
-     * without considering RollbackOnFailure context.
-     */
-    public static <FCT extends RollbackOnFailure> AdjustRoute<FCT> 
createAdjustRoute(Relationship ... failureRelationships) {
-        return (context, session, fc, result) -> {
-            if (fc.isRollbackOnFailure()) {
-                // Check if route contains failure relationship.
-                for (Relationship failureRelationship : failureRelationships) {
-                    if (!result.contains(failureRelationship)) {
-                        continue;
-                    }
-                    if (fc.canRollback()) {
-                        throw new ProcessException(String.format(
-                                "A FlowFile is routed to %s. Rollback session 
based on context rollbackOnFailure=%s, processedCount=%d, transactional=%s",
-                                failureRelationship.getName(), 
fc.isRollbackOnFailure(), fc.getProcessedCount(), fc.isTransactional()));
-                    } else {
-                        // Send failed FlowFiles to self.
-                        final Map<Relationship, List<FlowFile>> 
routedFlowFiles = result.getRoutedFlowFiles();
-                        final List<FlowFile> failedFlowFiles = 
routedFlowFiles.remove(failureRelationship);
-                        result.routeTo(failedFlowFiles, Relationship.SELF);
-                    }
-                }
-            }
-        };
-    }
-
-    public static <FCT extends RollbackOnFailure, I> 
ExceptionHandler.OnError<FCT, I> createOnError(ExceptionHandler.OnError<FCT, I> 
onError) {
-        return onError.andThen((context, input, result, e) -> {
-            if (context.shouldDiscontinue()) {
-                throw new DiscontinuedException("Discontinue processing due to 
" + e, e);
-            }
-        });
-    }
-
-    public static <FCT extends RollbackOnFailure> void onTrigger(
-            ProcessContext context, ProcessSessionFactory sessionFactory, FCT 
functionContext, ComponentLog logger,
-            PartialFunctions.OnTrigger onTrigger) throws ProcessException {
-
-        PartialFunctions.onTrigger(context, sessionFactory, logger, onTrigger, 
(session, t) -> {
-            // If RollbackOnFailure is enabled, do not penalize processing 
FlowFiles when rollback,
-            // in order to keep those in the incoming relationship to be 
processed again.
-            final boolean shouldPenalize = 
!functionContext.isRollbackOnFailure();
-            session.rollback(shouldPenalize);
-
-            // However, keeping failed FlowFile in the incoming relationship 
would retry it too often.
-            // So, administratively yield the process.
-            if (functionContext.isRollbackOnFailure()) {
-                logger.warn("Administratively yielding {} after rolling back 
due to {}", new Object[]{context.getName(), t}, t);
-                context.yield();
-            }
-        });
-    }
-
-    public int proceed() {
-        return ++processedCount;
-    }
-
-    public int getProcessedCount() {
-        return processedCount;
-    }
-
-    public boolean isRollbackOnFailure() {
-        return rollbackOnFailure;
-    }
-
-    public boolean isTransactional() {
-        return transactional;
-    }
-
-    public boolean canRollback() {
-        return transactional || processedCount == 0;
-    }
-
-    public boolean shouldDiscontinue() {
-        return discontinue;
-    }
-
-    public void discontinue() {
-        this.discontinue = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
 
b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
deleted file mode 100644
index 200d893..0000000
--- 
a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processor.util.pattern;
-
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.Relationship;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class RoutingResult {
-
-    private final Map<Relationship, List<FlowFile>> routedFlowFiles = new 
HashMap<>();
-
-    public void routeTo(final FlowFile flowFile, final Relationship 
relationship) {
-        routedFlowFiles.computeIfAbsent(relationship, r -> new 
ArrayList<>()).add(flowFile);
-    }
-
-    public void routeTo(final List<FlowFile> flowFiles, final Relationship 
relationship) {
-        routedFlowFiles.computeIfAbsent(relationship, r -> new 
ArrayList<>()).addAll(flowFiles);
-    }
-
-    public void merge(final RoutingResult r) {
-        r.getRoutedFlowFiles().forEach((relationship, routedFlowFiles) -> 
routeTo(routedFlowFiles, relationship));
-    }
-
-    public Map<Relationship, List<FlowFile>> getRoutedFlowFiles() {
-        return routedFlowFiles;
-    }
-
-    public boolean contains(Relationship relationship) {
-        return routedFlowFiles.containsKey(relationship) && 
!routedFlowFiles.get(relationship).isEmpty();
-    }
-}

Reply via email to