http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java deleted file mode 100644 index 07b5dcc..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandler.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler.socket; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandler; - -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.concurrent.BlockingQueue; - -/** - * Base class for socket channel handlers. - */ -public abstract class SocketChannelHandler<E extends Event<SocketChannel>> extends ChannelHandler<E, AsyncChannelDispatcher> { - - static final byte TCP_DELIMITER = '\n'; - - public SocketChannelHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory<E> eventFactory, - final BlockingQueue<E> events, - final ComponentLog logger) { - super(key, dispatcher, charset, eventFactory, events, logger); - } - - /** - * @return the byte used as the delimiter between messages for the given handler - */ - public abstract byte getDelimiter(); - -}
http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java deleted file mode 100644 index 9003f90..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/SocketChannelHandlerFactory.java +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler.socket; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.handler.ChannelHandler; -import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory; - -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.concurrent.BlockingQueue; - -/** - * Default factory for creating socket channel handlers. - */ -public class SocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> { - - @Override - public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory<E> eventFactory, - final BlockingQueue<E> events, - final ComponentLog logger) { - return new StandardSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); - } - - @Override - public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory<E> eventFactory, - final BlockingQueue<E> events, - final ComponentLog logger) { - return new SSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java deleted file mode 100644 index 250168c..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/handler/socket/StandardSocketChannelHandler.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.handler.socket; - -import org.apache.commons.io.IOUtils; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher; -import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelAttachment; -import org.apache.nifi.processor.util.listen.event.Event; -import org.apache.nifi.processor.util.listen.event.EventFactory; -import org.apache.nifi.processor.util.listen.event.EventFactoryUtil; -import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.net.InetAddress; -import java.nio.ByteBuffer; -import java.nio.channels.ClosedByInterruptException; -import java.nio.channels.ClosedChannelException; -import java.nio.channels.SelectionKey; -import java.nio.channels.SocketChannel; -import java.nio.charset.Charset; -import java.util.Map; -import java.util.concurrent.BlockingQueue; - -/** - * Reads from the given SocketChannel into the provided buffer. If the given delimiter is found, the data - * read up to that point is queued for processing. - */ -public class StandardSocketChannelHandler<E extends Event<SocketChannel>> extends SocketChannelHandler<E> { - - private final ByteArrayOutputStream currBytes = new ByteArrayOutputStream(4096); - - public StandardSocketChannelHandler(final SelectionKey key, - final AsyncChannelDispatcher dispatcher, - final Charset charset, - final EventFactory<E> eventFactory, - final BlockingQueue<E> events, - final ComponentLog logger) { - super(key, dispatcher, charset, eventFactory, events, logger); - } - - @Override - public void run() { - boolean eof = false; - SocketChannel socketChannel = null; - - try { - int bytesRead; - socketChannel = (SocketChannel) key.channel(); - - final SocketChannelAttachment attachment = (SocketChannelAttachment) key.attachment(); - final ByteBuffer socketBuffer = attachment.getByteBuffer(); - - // read until the buffer is full - while ((bytesRead = socketChannel.read(socketBuffer)) > 0) { - // prepare byte buffer for reading - socketBuffer.flip(); - // mark the current position as start, in case of partial message read - socketBuffer.mark(); - // process the contents that have been read into the buffer - processBuffer(socketChannel, socketBuffer); - - // Preserve bytes in buffer for next call to run - // NOTE: This code could benefit from the two ByteBuffer read calls to avoid - // this compact for higher throughput - socketBuffer.reset(); - socketBuffer.compact(); - logger.debug("bytes read {}", new Object[]{bytesRead}); - } - - // Check for closed socket - if( bytesRead < 0 ){ - eof = true; - logger.debug("Reached EOF, closing connection"); - } else { - logger.debug("No more data available, returning for selection"); - } - } catch (ClosedByInterruptException | InterruptedException e) { - logger.debug("read loop interrupted, closing connection"); - // Treat same as closed socket - eof = true; - } catch (ClosedChannelException e) { - // ClosedChannelException doesn't have a message so handle it separately from IOException - logger.error("Error reading from channel due to channel being closed", e); - // Treat same as closed socket - eof = true; - } catch (IOException e) { - logger.error("Error reading from channel due to {}", new Object[] {e.getMessage()}, e); - // Treat same as closed socket - eof = true; - } finally { - if(eof == true) { - IOUtils.closeQuietly(socketChannel); - dispatcher.completeConnection(key); - } else { - dispatcher.addBackForSelection(key); - } - } - } - - /** - * Process the contents that have been read into the buffer. Allow sub-classes to override this behavior. - * - * @param socketChannel the channel the data was read from - * @param socketBuffer the buffer the data was read into - * @throws InterruptedException if interrupted when queuing events - */ - protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer) throws InterruptedException, IOException { - // get total bytes in buffer - final int total = socketBuffer.remaining(); - final InetAddress sender = socketChannel.socket().getInetAddress(); - - // go through the buffer looking for the end of each message - currBytes.reset(); - for (int i = 0; i < total; i++) { - // NOTE: For higher throughput, the looking for \n and copying into the byte stream could be improved - // Pull data out of buffer and cram into byte array - byte currByte = socketBuffer.get(); - - // check if at end of a message - if (currByte == getDelimiter()) { - if (currBytes.size() > 0) { - final SocketChannelResponder response = new SocketChannelResponder(socketChannel); - final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString()); - final E event = eventFactory.create(currBytes.toByteArray(), metadata, response); - events.offer(event); - currBytes.reset(); - - // Mark this as the start of the next message - socketBuffer.mark(); - } - } else { - currBytes.write(currByte); - } - } - } - - @Override - public byte getDelimiter() { - return TCP_DELIMITER; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java deleted file mode 100644 index 978f3ac..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponder.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.response; - -import java.io.IOException; -import java.nio.channels.SelectableChannel; -import java.util.List; - -/** - * A responder for a given channel. - * - * @param <C> The type of SelectableChannel where the response will be written. - */ -public interface ChannelResponder<C extends SelectableChannel> { - - /** - * @return a SelectableChannel to write the response to - */ - C getChannel(); - - /** - * @return a list of responses to write to the channel - */ - List<ChannelResponse> getResponses(); - - /** - * @param response adds the given response to the list of responses - */ - void addResponse(ChannelResponse response); - - /** - * Writes the responses to the underlying channel. - */ - void respond() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java deleted file mode 100644 index 98f0301..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/ChannelResponse.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.response; - -/** - * A response to send back over channel. - */ -public interface ChannelResponse { - - /** - * @return the bytes that should be written to a channel for this response - */ - byte[] toByteArray(); - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java deleted file mode 100644 index 20102ba..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SSLSocketChannelResponder.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.response.socket; - -import org.apache.nifi.processor.util.listen.response.ChannelResponse; -import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel; - -import java.io.IOException; -import java.nio.channels.SocketChannel; - -/** - * A ChannelResponder for SSLSocketChannels. - */ -public class SSLSocketChannelResponder extends SocketChannelResponder { - - private SSLSocketChannel sslSocketChannel; - - public SSLSocketChannelResponder(final SocketChannel socketChannel, final SSLSocketChannel sslSocketChannel) { - super(socketChannel); - this.sslSocketChannel = sslSocketChannel; - } - - @Override - public void respond() throws IOException { - for (final ChannelResponse response : responses) { - sslSocketChannel.write(response.toByteArray()); - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java deleted file mode 100644 index 5c20bf0..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/listen/response/socket/SocketChannelResponder.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.listen.response.socket; - -import org.apache.nifi.processor.util.listen.response.ChannelResponder; -import org.apache.nifi.processor.util.listen.response.ChannelResponse; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * A ChannelResponder for SocketChannels. The SocketChannel should first be registered with a selector, - * upon being selected for writing the respond() method should be executed. - */ -public class SocketChannelResponder implements ChannelResponder<SocketChannel> { - - protected final List<ChannelResponse> responses; - protected final SocketChannel socketChannel; - - public SocketChannelResponder(final SocketChannel socketChannel) { - this.responses = new ArrayList<>(); - this.socketChannel = socketChannel; - } - - @Override - public SocketChannel getChannel() { - return socketChannel; - } - - @Override - public List<ChannelResponse> getResponses() { - return Collections.unmodifiableList(responses); - } - - @Override - public void addResponse(ChannelResponse response) { - this.responses.add(response); - } - - @Override - public void respond() throws IOException { - for (final ChannelResponse response : responses) { - final ByteBuffer responseBuffer = ByteBuffer.wrap(response.toByteArray()); - - while (responseBuffer.hasRemaining()) { - socketChannel.write(responseBuffer); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java deleted file mode 100644 index f97f31d..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -/** - * Represents a looping process was discontinued. - * When a method throws this exception, its caller should stop processing further inputs and stop immediately. - */ -public class DiscontinuedException extends RuntimeException { - public DiscontinuedException(String message) { - super(message); - } - - public DiscontinuedException(String message, Throwable cause) { - super(message, cause); - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java deleted file mode 100644 index c6cf140..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure; -import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.ProcessException; -import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Retry; -import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Self; -import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.None; -import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Penalize; -import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Yield; - -/** - * Represents general error types and how it should be treated. - */ -public enum ErrorTypes { - - /** - * Procedure setting has to be fixed, otherwise the same error would occur irrelevant to the input. - * In order to NOT call failing process frequently, this should be yielded. - */ - PersistentFailure(ProcessException, Yield), - - /** - * It is unknown whether the error is persistent or temporal, related to the input or not. - */ - UnknownFailure(ProcessException, None), - - /** - * The input will be sent to the failure route for recovery without penalizing. - * Basically, the input should not be sent to the same procedure again unless the issue has been solved. - */ - InvalidInput(Failure, None), - - /** - * The procedure is temporarily unavailable, usually due to the external service unavailability. - * Retrying maybe successful, but it should be yielded for a while. - */ - TemporalFailure(Retry, Yield), - - /** - * The input was not processed successfully due to some temporal error - * related to the specifics of the input. Retrying maybe successful, - * but it should be penalized for a while. - */ - TemporalInputFailure(Retry, Penalize), - - /** - * The input was not ready for being processed. It will be kept in the incoming queue and also be penalized. - */ - Defer(Self, Penalize); - - private final Destination destination; - private final Penalty penalty; - ErrorTypes(Destination destination, Penalty penalty){ - this.destination = destination; - this.penalty = penalty; - } - - public Result result() { - return new Result(destination, penalty); - } - - /** - * Represents the destination of input. - */ - public enum Destination { - ProcessException, Failure, Retry, Self - } - - /** - * Indicating yield or penalize the processing when transfer the input. - */ - public enum Penalty { - Yield, Penalize, None - } - - public Destination destination(){ - return this.destination; - } - - public Penalty penalty(){ - return this.penalty; - } - - /** - * Result represents a result of a procedure. - * ErrorTypes enum contains basic error result patterns. - */ - public static class Result { - private final Destination destination; - private final Penalty penalty; - - public Result(Destination destination, Penalty penalty) { - this.destination = destination; - this.penalty = penalty; - } - - public Destination destination() { - return destination; - } - - public Penalty penalty() { - return penalty; - } - - @Override - public String toString() { - return "Result{" + - "destination=" + destination + - ", penalty=" + penalty + - '}'; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Result result = (Result) o; - - if (destination != result.destination) return false; - return penalty == result.penalty; - } - - @Override - public int hashCode() { - int result = destination != null ? destination.hashCode() : 0; - result = 31 * result + (penalty != null ? penalty.hashCode() : 0); - return result; - } - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java deleted file mode 100644 index bd1c9eb..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java +++ /dev/null @@ -1,235 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.pattern.ErrorTypes.Result; - -import java.util.Collections; -import java.util.List; -import java.util.function.BiFunction; -import java.util.function.Function; - -/** - * <p>ExceptionHandler provides a structured Exception handling logic composed by reusable partial functions. - * - * <p> - * Benefits of using ExceptionHandler: - * <li>Externalized error handling code which provides cleaner program only focusing on the expected path.</li> - * <li>Classify specific Exceptions into {@link ErrorTypes}, consolidated error handling based on error type.</li> - * <li>Context aware error handling, {@link RollbackOnFailure} for instance.</li> - * </p> - */ -public class ExceptionHandler<C> { - - @FunctionalInterface - public interface Procedure<I> { - void apply(I input) throws Exception; - } - - public interface OnError<C, I> { - void apply(C context, I input, Result result, Exception e); - - default OnError<C, I> andThen(OnError<C, I> after) { - return (c, i, r, e) -> { - apply(c, i, r, e); - after.apply(c, i, r, e); - }; - } - } - - /** - * Simply categorise an Exception. - */ - private Function<Exception, ErrorTypes> mapException; - - /** - * Adjust error type based on the context. - */ - private BiFunction<C, ErrorTypes, Result> adjustError; - - /** - * Do some action to the input based on the final error type. - */ - private OnError<C, ?> onError; - - /** - * Specify a function that maps an Exception to certain ErrorType. - */ - public void mapException(Function<Exception, ErrorTypes> mapException) { - this.mapException = mapException; - } - - /** - * <p>Specify a function that adjust ErrorType based on a function context. - * <p>For example, {@link RollbackOnFailure#createAdjustError(ComponentLog)} decides - * whether a process session should rollback or transfer input to failure or retry. - */ - public void adjustError(BiFunction<C, ErrorTypes, Result> adjustError) { - this.adjustError = adjustError; - } - - /** - * <p>Specify a default OnError function that will be called if one is not explicitly specified when {@link #execute(Object, Object, Procedure)} is called. - */ - public void onError(OnError<C, ?> onError) { - this.onError = onError; - } - - /** - * <p>Executes specified procedure function with the input. - * <p>Default OnError function will be called when an exception is thrown. - * @param context function context - * @param input input for procedure - * @param procedure a function that does something with the input - * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}. - * @throws ProcessException Thrown if the exception was not handled by {@link OnError} - * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately - * without processing any further input - */ - @SuppressWarnings("unchecked") - public <I> boolean execute(C context, I input, Procedure<I> procedure) throws ProcessException, DiscontinuedException { - return execute(context, input, procedure, (OnError<C, I>) onError); - } - - /** - * <p>Executes specified procedure function with the input. - * @param context function context - * @param input input for procedure - * @param procedure a function that does something with the input - * @param onError specify {@link OnError} function for this execution - * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}. - * @throws ProcessException Thrown if the exception was not handled by {@link OnError} - * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately - * without processing any further input - */ - public <I> boolean execute(C context, I input, Procedure<I> procedure, OnError<C, I> onError) throws ProcessException, DiscontinuedException { - try { - procedure.apply(input); - return true; - } catch (Exception e) { - - if (mapException == null) { - throw new ProcessException("An exception was thrown: " + e, e); - } - - final ErrorTypes type = mapException.apply(e); - - final Result result; - if (adjustError != null) { - result = adjustError.apply(context, type); - } else { - result = new Result(type.destination(), type.penalty()); - } - - if (onError == null) { - throw new IllegalStateException("OnError is not set."); - } - - onError.apply(context, input, result, e); - } - return false; - } - - private static FlowFile penalize(final ProcessContext context, final ProcessSession session, - final FlowFile flowFile, final ErrorTypes.Penalty penalty) { - switch (penalty) { - case Penalize: - return session.penalize(flowFile); - case Yield: - context.yield(); - } - return flowFile; - } - - /** - * Create a {@link OnError} function instance that routes input based on {@link Result} destination and penalty. - * @param context process context is used to yield a processor - * @param session process session is used to penalize a FlowFile - * @param routingResult input FlowFile will be routed to a destination relationship in this {@link RoutingResult} - * @param relFailure specify failure relationship of a processor - * @param relRetry specify retry relationship of a processor - * @return composed function - */ - public static <C> ExceptionHandler.OnError<C, FlowFile> createOnError( - final ProcessContext context, final ProcessSession session, final RoutingResult routingResult, - final Relationship relFailure, final Relationship relRetry) { - - return (fc, input, result, e) -> { - final PartialFunctions.FlowFileGroup flowFileGroup = () -> Collections.singletonList(input); - createOnGroupError(context, session, routingResult, relFailure, relRetry).apply(fc, flowFileGroup, result, e); - }; - } - - /** - * Same as {@link #createOnError(ProcessContext, ProcessSession, RoutingResult, Relationship, Relationship)} for FlowFileGroup. - * @param context process context is used to yield a processor - * @param session process session is used to penalize FlowFiles - * @param routingResult input FlowFiles will be routed to a destination relationship in this {@link RoutingResult} - * @param relFailure specify failure relationship of a processor - * @param relRetry specify retry relationship of a processor - * @return composed function - */ - public static <C, I extends PartialFunctions.FlowFileGroup> ExceptionHandler.OnError<C, I> createOnGroupError( - final ProcessContext context, final ProcessSession session, final RoutingResult routingResult, - final Relationship relFailure, final Relationship relRetry) { - return (c, g, r, e) -> { - final Relationship routeTo; - switch (r.destination()) { - case Failure: - routeTo = relFailure; - break; - case Retry: - routeTo = relRetry; - break; - case Self: - routeTo = Relationship.SELF; - break; - default: - if (e instanceof ProcessException) { - throw (ProcessException)e; - } else { - Object inputs = null; - if (g != null) { - final List<FlowFile> flowFiles = g.getFlowFiles(); - switch (flowFiles.size()) { - case 0: - inputs = "[]"; - break; - case 1: - inputs = flowFiles.get(0); - break; - default: - inputs = String.format("%d FlowFiles including %s", flowFiles.size(), flowFiles.get(0)); - break; - } - } - throw new ProcessException(String.format("Failed to process %s due to %s", inputs, e), e); - } - } - for (FlowFile f : g.getFlowFiles()) { - final FlowFile maybePenalized = penalize(context, session, f, r.penalty()); - routingResult.routeTo(maybePenalized, routeTo); - } - }; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java deleted file mode 100644 index 8332289..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.exception.ProcessException; - -import java.util.List; - -/** - * This class contains various partial functions those are reusable among process patterns. - */ -public class PartialFunctions { - - @FunctionalInterface - public interface InitConnection<FC, C> { - C apply(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException; - } - - @FunctionalInterface - public interface FetchFlowFiles<FC> { - List<FlowFile> apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException; - } - - @FunctionalInterface - public interface OnCompleted<FC, C> { - void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException; - } - - @FunctionalInterface - public interface OnFailed<FC, C> { - void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, Exception e) throws ProcessException; - } - - @FunctionalInterface - public interface Cleanup<FC, C> { - void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException; - } - - @FunctionalInterface - public interface FlowFileGroup { - List<FlowFile> getFlowFiles(); - } - - @FunctionalInterface - public interface AdjustRoute<FC> { - void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException; - } - - @FunctionalInterface - public interface TransferFlowFiles<FC> { - void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException; - - default TransferFlowFiles<FC> andThen(TransferFlowFiles<FC> after) { - return (context, session, functionContext, result) -> { - apply(context, session, functionContext, result); - after.apply(context, session, functionContext, result); - }; - } - } - - public static <FCT> PartialFunctions.FetchFlowFiles<FCT> fetchSingleFlowFile() { - return (context, session, functionContext, result) -> session.get(1); - } - - public static <FCT> PartialFunctions.TransferFlowFiles<FCT> transferRoutedFlowFiles() { - return (context, session, functionContext, result) - -> result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles) - -> session.transfer(routedFlowFiles, relationship))); - } - - @FunctionalInterface - public interface OnTrigger { - void execute(ProcessSession session) throws ProcessException; - } - - @FunctionalInterface - public interface RollbackSession { - void rollback(ProcessSession session, Throwable t); - } - - /** - * <p>This method is identical to what {@link org.apache.nifi.processor.AbstractProcessor#onTrigger(ProcessContext, ProcessSession)} does.</p> - * <p>Create a session from ProcessSessionFactory and execute specified onTrigger function, and commit the session if onTrigger finishes successfully.</p> - * <p>When an Exception is thrown during execution of the onTrigger, the session will be rollback. FlowFiles being processed will be penalized.</p> - */ - public static void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger) throws ProcessException { - onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> session.rollback(true)); - } - - public static void onTrigger( - ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger, - RollbackSession rollbackSession) throws ProcessException { - final ProcessSession session = sessionFactory.createSession(); - try { - onTrigger.execute(session); - session.commit(); - } catch (final Throwable t) { - logger.error("{} failed to process due to {}; rolling back session", new Object[]{onTrigger, t}); - rollbackSession.rollback(session, t); - throw t; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java deleted file mode 100644 index 790f48a..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java +++ /dev/null @@ -1,228 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; - -import java.util.List; -import java.util.Objects; -import java.util.stream.Collectors; - -/** - * Abstract Put pattern class with a generic onTrigger method structure, composed with various partial functions. - * @param <FC> Class of context instance which is passed to each partial functions. - * Lifetime of an function context should be limited for a single onTrigger method. - * @param <C> Class of connection to a data storage that this pattern puts data into. - */ -public class Put<FC, C extends AutoCloseable> { - protected PartialFunctions.InitConnection<FC, C> initConnection; - protected PartialFunctions.FetchFlowFiles<FC> fetchFlowFiles = PartialFunctions.fetchSingleFlowFile(); - protected PutFlowFile<FC, C> putFlowFile; - protected PartialFunctions.TransferFlowFiles<FC> transferFlowFiles = PartialFunctions.transferRoutedFlowFiles(); - protected PartialFunctions.AdjustRoute<FC> adjustRoute; - protected PartialFunctions.OnCompleted<FC, C> onCompleted; - protected PartialFunctions.OnFailed<FC, C> onFailed; - protected PartialFunctions.Cleanup<FC, C> cleanup; - protected ComponentLog logger; - - /** - * Put fetched FlowFiles to a data storage. - * @param context process context passed from a Processor onTrigger. - * @param session process session passed from a Processor onTrigger. - * @param functionContext function context passed from a Processor onTrigger. - * @param connection connection to data storage, established by {@link PartialFunctions.InitConnection}. - * @param flowFiles FlowFiles fetched from {@link PartialFunctions.FetchFlowFiles}. - * @param result Route incoming FlowFiles if necessary. - */ - protected void putFlowFiles(ProcessContext context, ProcessSession session, - FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException { - for (FlowFile flowFile : flowFiles) { - putFlowFile.apply(context, session, functionContext, connection, flowFile, result); - } - } - - protected void validateCompositePattern() { - Objects.requireNonNull(initConnection, "InitConnection function is required."); - Objects.requireNonNull(putFlowFile, "PutFlowFile function is required."); - Objects.requireNonNull(transferFlowFiles, "TransferFlowFiles function is required."); - } - - /** - * <p>Processor using this pattern is expected to call this method from its onTrigger. - * <p>Typical usage would be constructing a process pattern instance at a processor method - * which is annotated with {@link org.apache.nifi.annotation.lifecycle.OnScheduled}, - * and use pattern.onTrigger from processor.onTrigger. - * <p>{@link PartialFunctions.InitConnection} is required at least. In addition to any functions required by an implementation class. - * @param context process context passed from a Processor onTrigger. - * @param session process session passed from a Processor onTrigger. - * @param functionContext function context should be instantiated per onTrigger call. - * @throws ProcessException Each partial function can throw ProcessException if onTrigger should stop immediately. - */ - public void onTrigger(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException { - - validateCompositePattern(); - - final RoutingResult result = new RoutingResult(); - final List<FlowFile> flowFiles = fetchFlowFiles.apply(context, session, functionContext, result); - - // Transfer FlowFiles if there is any. - result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles) -> - session.transfer(routedFlowFiles, relationship))); - - if (flowFiles == null || flowFiles.isEmpty()) { - logger.debug("No incoming FlowFiles."); - return; - } - - try (C connection = initConnection.apply(context, session, functionContext)) { - - try { - // Execute the core function. - try { - putFlowFiles(context, session, functionContext, connection, flowFiles, result); - } catch (DiscontinuedException e) { - // Whether it was an error or semi normal is depends on the implementation and reason why it wanted to discontinue. - // So, no logging is needed here. - } - - // Extension point to alter routes. - if (adjustRoute != null) { - adjustRoute.apply(context, session, functionContext, result); - } - - // Put fetched, but unprocessed FlowFiles back to self. - final List<FlowFile> transferredFlowFiles = result.getRoutedFlowFiles().values().stream() - .flatMap(List::stream).collect(Collectors.toList()); - final List<FlowFile> unprocessedFlowFiles = flowFiles.stream() - .filter(flowFile -> !transferredFlowFiles.contains(flowFile)).collect(Collectors.toList()); - result.routeTo(unprocessedFlowFiles, Relationship.SELF); - - // OnCompleted processing. - if (onCompleted != null) { - onCompleted.apply(context, session, functionContext, connection); - } - - // Transfer FlowFiles. - transferFlowFiles.apply(context, session, functionContext, result); - - } catch (Exception e) { - if (onFailed != null) { - onFailed.apply(context, session, functionContext, connection, e); - } - throw e; - } finally { - if (cleanup != null) { - cleanup.apply(context, session, functionContext, connection); - } - } - - } catch (ProcessException e) { - throw e; - } catch (Exception e) { - // Throw uncaught exception as RuntimeException so that this processor will be yielded. - final String msg = String.format("Failed to execute due to %s", e); - logger.error(msg, e); - throw new RuntimeException(msg, e); - } - - } - - /** - * Specify an optional function that fetches incoming FlowFIles. - * If not specified, single FlowFile is fetched on each onTrigger. - * @param f Function to fetch incoming FlowFiles. - */ - public void fetchFlowFiles(PartialFunctions.FetchFlowFiles<FC> f) { - fetchFlowFiles = f; - } - - /** - * Specify a function that establishes a connection to target data storage. - * This function will be called when there is valid incoming FlowFiles. - * The created connection instance is automatically closed when onTrigger is finished. - * @param f Function to initiate a connection to a data storage. - */ - public void initConnection(PartialFunctions.InitConnection<FC, C> f) { - initConnection = f; - } - - /** - * Specify a function that puts an incoming FlowFile to target data storage. - * @param f a function to put a FlowFile to target storage. - */ - public void putFlowFile(PutFlowFile<FC, C> f) { - this.putFlowFile = f; - } - - /** - * Specify an optional function that adjust routed FlowFiles before transfer it. - * @param f a function to adjust route. - */ - public void adjustRoute(PartialFunctions.AdjustRoute<FC> f) { - this.adjustRoute = f; - } - - /** - * Specify an optional function responsible for transferring routed FlowFiles. - * If not specified routed FlowFiles are simply transferred to its destination by default. - * @param f a function to transfer routed FlowFiles. - */ - public void transferFlowFiles(PartialFunctions.TransferFlowFiles<FC> f) { - this.transferFlowFiles = f; - } - - /** - * Specify an optional function which will be called if input FlowFiles were successfully put to a target storage. - * @param f Function to be called when a put operation finishes successfully. - */ - public void onCompleted(PartialFunctions.OnCompleted<FC, C> f) { - onCompleted = f; - } - - /** - * Specify an optional function which will be called if input FlowFiles failed being put to a target storage. - * @param f Function to be called when a put operation failed. - */ - public void onFailed(PartialFunctions.OnFailed<FC, C> f) { - onFailed = f; - } - - /** - * Specify an optional function which will be called in a finally block. - * Typically useful when a special cleanup operation is needed for the connection. - * @param f Function to be called when a put operation finished regardless of whether it succeeded or not. - */ - public void cleanup(PartialFunctions.Cleanup<FC, C> f) { - cleanup = f; - } - - public void setLogger(ComponentLog logger) { - this.logger = logger; - } - - @FunctionalInterface - public interface PutFlowFile<FC, C> { - void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, - FlowFile flowFile, RoutingResult result) throws ProcessException; - } - -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java deleted file mode 100644 index 6e9da2e..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; - -import java.util.List; -import java.util.Objects; - -/** - * Extended Put pattern capable of handling FlowFile groups. - * @param <FC> Function context class. - * @param <C> Connection class. - * @param <FFG> FlowFileGroup class. - */ -public class PutGroup<FC, C extends AutoCloseable, FFG extends PartialFunctions.FlowFileGroup> extends Put<FC, C> { - - - public PutGroup() { - // Just to make a composition valid. - this.putFlowFile = (context, session, functionContext, connection, inputFlowFile, result) -> { - throw new UnsupportedOperationException(); - }; - } - - @FunctionalInterface - public interface PutFlowFiles<FC, C, FFG> { - void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, - FFG inputFlowFileGroup, RoutingResult result) throws ProcessException; - } - - @Override - protected void validateCompositePattern() { - super.validateCompositePattern(); - Objects.requireNonNull(groupFlowFiles, "GroupFlowFiles function is required."); - } - - /** - * PutGroup does not support PutFileFile function for single FlowFile. - * Throws UnsupportedOperationException if called. - */ - @Override - public void putFlowFile(PutFlowFile<FC, C> putFlowFile) { - throw new UnsupportedOperationException("PutFlowFile can not be used with PutGroup pattern. Specify PutFlowFiles instead."); - } - - @FunctionalInterface - public interface GroupFlowFiles<FC, C, FFG> { - List<FFG> apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException; - } - - private GroupFlowFiles<FC, C, FFG> groupFlowFiles; - private PutFlowFiles<FC, C, FFG> putFlowFiles; - - /** - * Specify a function that groups input FlowFiles into FlowFile groups. - */ - public void groupFetchedFlowFiles(GroupFlowFiles<FC, C, FFG> f) { - groupFlowFiles = f; - } - - /** - * Specify a function that puts an input FlowFile group to a target storage using a given connection. - */ - public void putFlowFiles(PutFlowFiles<FC, C, FFG> f) { - putFlowFiles = f; - } - - - @Override - protected void putFlowFiles(ProcessContext context, ProcessSession session, FC functionContext, - C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException { - final List<FFG> flowFileGroups = groupFlowFiles - .apply(context, session, functionContext, connection, flowFiles, result); - - for (FFG group : flowFileGroups) { - putFlowFiles.apply(context, session, functionContext, connection, group, result); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java deleted file mode 100644 index 2d4d768..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.processor.util.pattern.PartialFunctions.AdjustRoute; - -import java.util.List; -import java.util.Map; -import java.util.function.BiFunction; - -/** - * <p>RollbackOnFailure can be used as a function context for process patterns such as {@link Put} to provide a configurable error handling. - * - * <p> - * RollbackOnFailure can add following characteristics to a processor: - * <li>When disabled, input FlowFiles caused an error will be routed to 'failure' or 'retry' relationship, based on the type of error.</li> - * <li>When enabled, input FlowFiles are kept in the input queue. A ProcessException is thrown to rollback the process session.</li> - * <li>It assumes anything happened during a processors onTrigger can rollback, if this is marked as transactional.</li> - * <li>If transactional and enabled, even if some FlowFiles are already processed, it rollbacks the session when error occurs.</li> - * <li>If not transactional and enabled, it only rollbacks the session when error occurs only if there was no progress.</li> - * </p> - * - * <p>There are two approaches to apply RollbackOnFailure. One is using {@link ExceptionHandler#adjustError(BiFunction)}, - * and the other is implementing processor onTrigger using process patterns such as {@link Put#adjustRoute(AdjustRoute)}. </p> - * - * <p>It's also possible to use both approaches. ExceptionHandler can apply when an Exception is thrown immediately, while AdjustRoute respond later but requires less code.</p> - */ -public class RollbackOnFailure { - - private final boolean rollbackOnFailure; - private final boolean transactional; - private boolean discontinue; - - private int processedCount = 0; - - /** - * Constructor. - * @param rollbackOnFailure Should be set by user via processor configuration. - * @param transactional Specify whether a processor is transactional. - * If not, it is important to call {@link #proceed()} after successful execution of processors task, - * that indicates processor made an operation that can not be undone. - */ - public RollbackOnFailure(boolean rollbackOnFailure, boolean transactional) { - this.rollbackOnFailure = rollbackOnFailure; - this.transactional = transactional; - } - - public static final PropertyDescriptor ROLLBACK_ON_FAILURE = createRollbackOnFailureProperty(""); - - public static PropertyDescriptor createRollbackOnFailureProperty(String additionalDescription) { - return new PropertyDescriptor.Builder() - .name("rollback-on-failure") - .displayName("Rollback On Failure") - .description("Specify how to handle error." + - " By default (false), if an error occurs while processing a FlowFile, the FlowFile will be routed to" + - " 'failure' or 'retry' relationship based on error type, and processor can continue with next FlowFile." + - " Instead, you may want to rollback currently processed FlowFiles and stop further processing immediately." + - " In that case, you can do so by enabling this 'Rollback On Failure' property. " + - " If enabled, failed FlowFiles will stay in the input relationship without penalizing it and being processed repeatedly" + - " until it gets processed successfully or removed by other means." + - " It is important to set adequate 'Yield Duration' to avoid retrying too frequently." + additionalDescription) - .allowableValues("true", "false") - .addValidator(StandardValidators.BOOLEAN_VALIDATOR) - .defaultValue("false") - .required(true) - .build(); - } - - /** - * Create a function to use with {@link ExceptionHandler} that adjust error type based on functional context. - */ - public static <FCT extends RollbackOnFailure> BiFunction<FCT, ErrorTypes, ErrorTypes.Result> createAdjustError(final ComponentLog logger) { - return (c, t) -> { - - ErrorTypes.Result adjusted = null; - switch (t.destination()) { - - case ProcessException: - // If this process can rollback, then rollback it. - if (!c.canRollback()) { - // If an exception is thrown but the processor is not transactional and processed count > 0, adjust it to self, - // in order to stop any further processing until this input is processed successfully. - // If we throw an Exception in this state, the already succeeded FlowFiles will be rolled back, too. - // In case the progress was made by other preceding inputs, - // those successful inputs should be sent to 'success' and this input stays in incoming queue. - // In case this input made some progress to external system, the partial update will be replayed again, - // can cause duplicated data. - c.discontinue(); - // We should not penalize a FlowFile, if we did, other FlowFiles can be fetched first. - // We need to block others to be processed until this one finishes. - adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield); - } - break; - - case Failure: - case Retry: - if (c.isRollbackOnFailure()) { - c.discontinue(); - if (c.canRollback()) { - // If this process can rollback, then throw ProcessException instead, in order to rollback. - adjusted = new ErrorTypes.Result(ErrorTypes.Destination.ProcessException, ErrorTypes.Penalty.Yield); - } else { - // If not, - adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield); - } - } - break; - } - - if (adjusted != null) { - if (logger.isDebugEnabled()) { - logger.debug("Adjusted {} to {} based on context rollbackOnFailure={}, processedCount={}, transactional={}", - new Object[]{t, adjusted, c.isRollbackOnFailure(), c.getProcessedCount(), c.isTransactional()}); - } - return adjusted; - } - - return t.result(); - }; - } - - /** - * Create an {@link AdjustRoute} function to use with process pattern such as {@link Put} that adjust routed FlowFiles based on context. - * This function works as a safety net by covering cases that Processor implementation did not use ExceptionHandler and transfer FlowFiles - * without considering RollbackOnFailure context. - */ - public static <FCT extends RollbackOnFailure> AdjustRoute<FCT> createAdjustRoute(Relationship ... failureRelationships) { - return (context, session, fc, result) -> { - if (fc.isRollbackOnFailure()) { - // Check if route contains failure relationship. - for (Relationship failureRelationship : failureRelationships) { - if (!result.contains(failureRelationship)) { - continue; - } - if (fc.canRollback()) { - throw new ProcessException(String.format( - "A FlowFile is routed to %s. Rollback session based on context rollbackOnFailure=%s, processedCount=%d, transactional=%s", - failureRelationship.getName(), fc.isRollbackOnFailure(), fc.getProcessedCount(), fc.isTransactional())); - } else { - // Send failed FlowFiles to self. - final Map<Relationship, List<FlowFile>> routedFlowFiles = result.getRoutedFlowFiles(); - final List<FlowFile> failedFlowFiles = routedFlowFiles.remove(failureRelationship); - result.routeTo(failedFlowFiles, Relationship.SELF); - } - } - } - }; - } - - public static <FCT extends RollbackOnFailure, I> ExceptionHandler.OnError<FCT, I> createOnError(ExceptionHandler.OnError<FCT, I> onError) { - return onError.andThen((context, input, result, e) -> { - if (context.shouldDiscontinue()) { - throw new DiscontinuedException("Discontinue processing due to " + e, e); - } - }); - } - - public static <FCT extends RollbackOnFailure> void onTrigger( - ProcessContext context, ProcessSessionFactory sessionFactory, FCT functionContext, ComponentLog logger, - PartialFunctions.OnTrigger onTrigger) throws ProcessException { - - PartialFunctions.onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> { - // If RollbackOnFailure is enabled, do not penalize processing FlowFiles when rollback, - // in order to keep those in the incoming relationship to be processed again. - final boolean shouldPenalize = !functionContext.isRollbackOnFailure(); - session.rollback(shouldPenalize); - - // However, keeping failed FlowFile in the incoming relationship would retry it too often. - // So, administratively yield the process. - if (functionContext.isRollbackOnFailure()) { - logger.warn("Administratively yielding {} after rolling back due to {}", new Object[]{context.getName(), t}, t); - context.yield(); - } - }); - } - - public int proceed() { - return ++processedCount; - } - - public int getProcessedCount() { - return processedCount; - } - - public boolean isRollbackOnFailure() { - return rollbackOnFailure; - } - - public boolean isTransactional() { - return transactional; - } - - public boolean canRollback() { - return transactional || processedCount == 0; - } - - public boolean shouldDiscontinue() { - return discontinue; - } - - public void discontinue() { - this.discontinue = true; - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java deleted file mode 100644 index 200d893..0000000 --- a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processor.util.pattern; - -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.Relationship; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class RoutingResult { - - private final Map<Relationship, List<FlowFile>> routedFlowFiles = new HashMap<>(); - - public void routeTo(final FlowFile flowFile, final Relationship relationship) { - routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).add(flowFile); - } - - public void routeTo(final List<FlowFile> flowFiles, final Relationship relationship) { - routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).addAll(flowFiles); - } - - public void merge(final RoutingResult r) { - r.getRoutedFlowFiles().forEach((relationship, routedFlowFiles) -> routeTo(routedFlowFiles, relationship)); - } - - public Map<Relationship, List<FlowFile>> getRoutedFlowFiles() { - return routedFlowFiles; - } - - public boolean contains(Relationship relationship) { - return routedFlowFiles.containsKey(relationship) && !routedFlowFiles.get(relationship).isEmpty(); - } -}
