NIFI-3415: Add Rollback on Failure. - Added org.apache.nifi.processor.util.pattern package in nifi-processor-utils containing reusable functions to mix-in 'Rollback on Failure' capability. - Created a process pattern classes, Put and PutGroup. It will be helpful to standardize Processor implementations. - Applied Rollback on Failure to PutSQL, PutHiveQL, PutHiveStreaming and PutDatabaseRecord. - Stop using AbstractProcessor for these processors, as it penalizes FlowFiles being processed when it rollback a process session. If FlowFiles are penalized, it will not be fetched again until penalization expires. - Yield processor when a failure occurs and RollbackOnFailure is enabled. If we do not penalize nor yield, a failed FlowFile retries too frequently. - When Rollback on Failure is enabled but processor is not transactional, discontinue when an error occurred after successful processes. - Fixed existing issues on PutHiveStreaming: - Output FlowFile Avro format was corrupted by concatenating multiple Avro files. - Output FlowFile records had incorrect values because of reusing GenericRecord instance.
Signed-off-by: Matt Burgess <[email protected]> This closes #1658 Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d9acdb54 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d9acdb54 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d9acdb54 Branch: refs/heads/master Commit: d9acdb54bec96695837f8fcde54c58403aa46f29 Parents: a1bffbc Author: Koji Kawamura <[email protected]> Authored: Thu Mar 2 09:51:12 2017 +0900 Committer: Matt Burgess <[email protected]> Committed: Thu Apr 27 13:44:56 2017 -0400 ---------------------------------------------------------------------- nifi-commons/nifi-processor-utilities/pom.xml | 10 + .../util/pattern/DiscontinuedException.java | 31 + .../nifi/processor/util/pattern/ErrorTypes.java | 148 ++++ .../util/pattern/ExceptionHandler.java | 235 ++++++ .../util/pattern/PartialFunctions.java | 122 +++ .../apache/nifi/processor/util/pattern/Put.java | 228 ++++++ .../nifi/processor/util/pattern/PutGroup.java | 97 +++ .../util/pattern/RollbackOnFailure.java | 226 ++++++ .../processor/util/pattern/RoutingResult.java | 50 ++ .../util/pattern/TestExceptionHandler.java | 202 +++++ .../util/pattern/TestRollbackOnFailure.java | 144 ++++ .../nifi-hive-processors/pom.xml | 4 + .../hive/AbstractHiveQLProcessor.java | 10 +- .../apache/nifi/processors/hive/PutHiveQL.java | 160 ++-- .../nifi/processors/hive/PutHiveStreaming.java | 575 ++++++++------ .../nifi/processors/hive/SelectHiveQL.java | 8 +- .../nifi/processors/hive/TestPutHiveQL.java | 178 ++++- .../processors/hive/TestPutHiveStreaming.java | 493 +++++++++++- .../processors/standard/PutDatabaseRecord.java | 747 ++++++++++--------- .../apache/nifi/processors/standard/PutSQL.java | 723 ++++++++++-------- .../standard/TestPutDatabaseRecord.groovy | 142 +++- .../nifi/processors/standard/TestPutSQL.java | 328 ++++++++ 22 files changed, 3856 insertions(+), 1005 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/pom.xml b/nifi-commons/nifi-processor-utilities/pom.xml index 054f89b..ce5ae0b 100644 --- a/nifi-commons/nifi-processor-utilities/pom.xml +++ b/nifi-commons/nifi-processor-utilities/pom.xml @@ -53,5 +53,15 @@ <artifactId>nifi-ssl-context-service-api</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java new file mode 100644 index 0000000..f97f31d --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/DiscontinuedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +/** + * Represents a looping process was discontinued. + * When a method throws this exception, its caller should stop processing further inputs and stop immediately. + */ +public class DiscontinuedException extends RuntimeException { + public DiscontinuedException(String message) { + super(message); + } + + public DiscontinuedException(String message, Throwable cause) { + super(message, cause); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java new file mode 100644 index 0000000..c6cf140 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ErrorTypes.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Failure; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.ProcessException; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Retry; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Destination.Self; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.None; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Penalize; +import static org.apache.nifi.processor.util.pattern.ErrorTypes.Penalty.Yield; + +/** + * Represents general error types and how it should be treated. + */ +public enum ErrorTypes { + + /** + * Procedure setting has to be fixed, otherwise the same error would occur irrelevant to the input. + * In order to NOT call failing process frequently, this should be yielded. + */ + PersistentFailure(ProcessException, Yield), + + /** + * It is unknown whether the error is persistent or temporal, related to the input or not. + */ + UnknownFailure(ProcessException, None), + + /** + * The input will be sent to the failure route for recovery without penalizing. + * Basically, the input should not be sent to the same procedure again unless the issue has been solved. + */ + InvalidInput(Failure, None), + + /** + * The procedure is temporarily unavailable, usually due to the external service unavailability. + * Retrying maybe successful, but it should be yielded for a while. + */ + TemporalFailure(Retry, Yield), + + /** + * The input was not processed successfully due to some temporal error + * related to the specifics of the input. Retrying maybe successful, + * but it should be penalized for a while. + */ + TemporalInputFailure(Retry, Penalize), + + /** + * The input was not ready for being processed. It will be kept in the incoming queue and also be penalized. + */ + Defer(Self, Penalize); + + private final Destination destination; + private final Penalty penalty; + ErrorTypes(Destination destination, Penalty penalty){ + this.destination = destination; + this.penalty = penalty; + } + + public Result result() { + return new Result(destination, penalty); + } + + /** + * Represents the destination of input. + */ + public enum Destination { + ProcessException, Failure, Retry, Self + } + + /** + * Indicating yield or penalize the processing when transfer the input. + */ + public enum Penalty { + Yield, Penalize, None + } + + public Destination destination(){ + return this.destination; + } + + public Penalty penalty(){ + return this.penalty; + } + + /** + * Result represents a result of a procedure. + * ErrorTypes enum contains basic error result patterns. + */ + public static class Result { + private final Destination destination; + private final Penalty penalty; + + public Result(Destination destination, Penalty penalty) { + this.destination = destination; + this.penalty = penalty; + } + + public Destination destination() { + return destination; + } + + public Penalty penalty() { + return penalty; + } + + @Override + public String toString() { + return "Result{" + + "destination=" + destination + + ", penalty=" + penalty + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Result result = (Result) o; + + if (destination != result.destination) return false; + return penalty == result.penalty; + } + + @Override + public int hashCode() { + int result = destination != null ? destination.hashCode() : 0; + result = 31 * result + (penalty != null ? penalty.hashCode() : 0); + return result; + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java new file mode 100644 index 0000000..bd1c9eb --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/ExceptionHandler.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.pattern.ErrorTypes.Result; + +import java.util.Collections; +import java.util.List; +import java.util.function.BiFunction; +import java.util.function.Function; + +/** + * <p>ExceptionHandler provides a structured Exception handling logic composed by reusable partial functions. + * + * <p> + * Benefits of using ExceptionHandler: + * <li>Externalized error handling code which provides cleaner program only focusing on the expected path.</li> + * <li>Classify specific Exceptions into {@link ErrorTypes}, consolidated error handling based on error type.</li> + * <li>Context aware error handling, {@link RollbackOnFailure} for instance.</li> + * </p> + */ +public class ExceptionHandler<C> { + + @FunctionalInterface + public interface Procedure<I> { + void apply(I input) throws Exception; + } + + public interface OnError<C, I> { + void apply(C context, I input, Result result, Exception e); + + default OnError<C, I> andThen(OnError<C, I> after) { + return (c, i, r, e) -> { + apply(c, i, r, e); + after.apply(c, i, r, e); + }; + } + } + + /** + * Simply categorise an Exception. + */ + private Function<Exception, ErrorTypes> mapException; + + /** + * Adjust error type based on the context. + */ + private BiFunction<C, ErrorTypes, Result> adjustError; + + /** + * Do some action to the input based on the final error type. + */ + private OnError<C, ?> onError; + + /** + * Specify a function that maps an Exception to certain ErrorType. + */ + public void mapException(Function<Exception, ErrorTypes> mapException) { + this.mapException = mapException; + } + + /** + * <p>Specify a function that adjust ErrorType based on a function context. + * <p>For example, {@link RollbackOnFailure#createAdjustError(ComponentLog)} decides + * whether a process session should rollback or transfer input to failure or retry. + */ + public void adjustError(BiFunction<C, ErrorTypes, Result> adjustError) { + this.adjustError = adjustError; + } + + /** + * <p>Specify a default OnError function that will be called if one is not explicitly specified when {@link #execute(Object, Object, Procedure)} is called. + */ + public void onError(OnError<C, ?> onError) { + this.onError = onError; + } + + /** + * <p>Executes specified procedure function with the input. + * <p>Default OnError function will be called when an exception is thrown. + * @param context function context + * @param input input for procedure + * @param procedure a function that does something with the input + * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}. + * @throws ProcessException Thrown if the exception was not handled by {@link OnError} + * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately + * without processing any further input + */ + @SuppressWarnings("unchecked") + public <I> boolean execute(C context, I input, Procedure<I> procedure) throws ProcessException, DiscontinuedException { + return execute(context, input, procedure, (OnError<C, I>) onError); + } + + /** + * <p>Executes specified procedure function with the input. + * @param context function context + * @param input input for procedure + * @param procedure a function that does something with the input + * @param onError specify {@link OnError} function for this execution + * @return True if the procedure finished without issue. False if procedure threw an Exception but it was handled by {@link OnError}. + * @throws ProcessException Thrown if the exception was not handled by {@link OnError} + * @throws DiscontinuedException Indicating the exception was handled by {@link OnError} but process should stop immediately + * without processing any further input + */ + public <I> boolean execute(C context, I input, Procedure<I> procedure, OnError<C, I> onError) throws ProcessException, DiscontinuedException { + try { + procedure.apply(input); + return true; + } catch (Exception e) { + + if (mapException == null) { + throw new ProcessException("An exception was thrown: " + e, e); + } + + final ErrorTypes type = mapException.apply(e); + + final Result result; + if (adjustError != null) { + result = adjustError.apply(context, type); + } else { + result = new Result(type.destination(), type.penalty()); + } + + if (onError == null) { + throw new IllegalStateException("OnError is not set."); + } + + onError.apply(context, input, result, e); + } + return false; + } + + private static FlowFile penalize(final ProcessContext context, final ProcessSession session, + final FlowFile flowFile, final ErrorTypes.Penalty penalty) { + switch (penalty) { + case Penalize: + return session.penalize(flowFile); + case Yield: + context.yield(); + } + return flowFile; + } + + /** + * Create a {@link OnError} function instance that routes input based on {@link Result} destination and penalty. + * @param context process context is used to yield a processor + * @param session process session is used to penalize a FlowFile + * @param routingResult input FlowFile will be routed to a destination relationship in this {@link RoutingResult} + * @param relFailure specify failure relationship of a processor + * @param relRetry specify retry relationship of a processor + * @return composed function + */ + public static <C> ExceptionHandler.OnError<C, FlowFile> createOnError( + final ProcessContext context, final ProcessSession session, final RoutingResult routingResult, + final Relationship relFailure, final Relationship relRetry) { + + return (fc, input, result, e) -> { + final PartialFunctions.FlowFileGroup flowFileGroup = () -> Collections.singletonList(input); + createOnGroupError(context, session, routingResult, relFailure, relRetry).apply(fc, flowFileGroup, result, e); + }; + } + + /** + * Same as {@link #createOnError(ProcessContext, ProcessSession, RoutingResult, Relationship, Relationship)} for FlowFileGroup. + * @param context process context is used to yield a processor + * @param session process session is used to penalize FlowFiles + * @param routingResult input FlowFiles will be routed to a destination relationship in this {@link RoutingResult} + * @param relFailure specify failure relationship of a processor + * @param relRetry specify retry relationship of a processor + * @return composed function + */ + public static <C, I extends PartialFunctions.FlowFileGroup> ExceptionHandler.OnError<C, I> createOnGroupError( + final ProcessContext context, final ProcessSession session, final RoutingResult routingResult, + final Relationship relFailure, final Relationship relRetry) { + return (c, g, r, e) -> { + final Relationship routeTo; + switch (r.destination()) { + case Failure: + routeTo = relFailure; + break; + case Retry: + routeTo = relRetry; + break; + case Self: + routeTo = Relationship.SELF; + break; + default: + if (e instanceof ProcessException) { + throw (ProcessException)e; + } else { + Object inputs = null; + if (g != null) { + final List<FlowFile> flowFiles = g.getFlowFiles(); + switch (flowFiles.size()) { + case 0: + inputs = "[]"; + break; + case 1: + inputs = flowFiles.get(0); + break; + default: + inputs = String.format("%d FlowFiles including %s", flowFiles.size(), flowFiles.get(0)); + break; + } + } + throw new ProcessException(String.format("Failed to process %s due to %s", inputs, e), e); + } + } + for (FlowFile f : g.getFlowFiles()) { + final FlowFile maybePenalized = penalize(context, session, f, r.penalty()); + routingResult.routeTo(maybePenalized, routeTo); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java new file mode 100644 index 0000000..8332289 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PartialFunctions.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; + +/** + * This class contains various partial functions those are reusable among process patterns. + */ +public class PartialFunctions { + + @FunctionalInterface + public interface InitConnection<FC, C> { + C apply(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException; + } + + @FunctionalInterface + public interface FetchFlowFiles<FC> { + List<FlowFile> apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException; + } + + @FunctionalInterface + public interface OnCompleted<FC, C> { + void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException; + } + + @FunctionalInterface + public interface OnFailed<FC, C> { + void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, Exception e) throws ProcessException; + } + + @FunctionalInterface + public interface Cleanup<FC, C> { + void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection) throws ProcessException; + } + + @FunctionalInterface + public interface FlowFileGroup { + List<FlowFile> getFlowFiles(); + } + + @FunctionalInterface + public interface AdjustRoute<FC> { + void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException; + } + + @FunctionalInterface + public interface TransferFlowFiles<FC> { + void apply(ProcessContext context, ProcessSession session, FC functionContext, RoutingResult result) throws ProcessException; + + default TransferFlowFiles<FC> andThen(TransferFlowFiles<FC> after) { + return (context, session, functionContext, result) -> { + apply(context, session, functionContext, result); + after.apply(context, session, functionContext, result); + }; + } + } + + public static <FCT> PartialFunctions.FetchFlowFiles<FCT> fetchSingleFlowFile() { + return (context, session, functionContext, result) -> session.get(1); + } + + public static <FCT> PartialFunctions.TransferFlowFiles<FCT> transferRoutedFlowFiles() { + return (context, session, functionContext, result) + -> result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles) + -> session.transfer(routedFlowFiles, relationship))); + } + + @FunctionalInterface + public interface OnTrigger { + void execute(ProcessSession session) throws ProcessException; + } + + @FunctionalInterface + public interface RollbackSession { + void rollback(ProcessSession session, Throwable t); + } + + /** + * <p>This method is identical to what {@link org.apache.nifi.processor.AbstractProcessor#onTrigger(ProcessContext, ProcessSession)} does.</p> + * <p>Create a session from ProcessSessionFactory and execute specified onTrigger function, and commit the session if onTrigger finishes successfully.</p> + * <p>When an Exception is thrown during execution of the onTrigger, the session will be rollback. FlowFiles being processed will be penalized.</p> + */ + public static void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger) throws ProcessException { + onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> session.rollback(true)); + } + + public static void onTrigger( + ProcessContext context, ProcessSessionFactory sessionFactory, ComponentLog logger, OnTrigger onTrigger, + RollbackSession rollbackSession) throws ProcessException { + final ProcessSession session = sessionFactory.createSession(); + try { + onTrigger.execute(session); + session.commit(); + } catch (final Throwable t) { + logger.error("{} failed to process due to {}; rolling back session", new Object[]{onTrigger, t}); + rollbackSession.rollback(session, t); + throw t; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java new file mode 100644 index 0000000..790f48a --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/Put.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Abstract Put pattern class with a generic onTrigger method structure, composed with various partial functions. + * @param <FC> Class of context instance which is passed to each partial functions. + * Lifetime of an function context should be limited for a single onTrigger method. + * @param <C> Class of connection to a data storage that this pattern puts data into. + */ +public class Put<FC, C extends AutoCloseable> { + protected PartialFunctions.InitConnection<FC, C> initConnection; + protected PartialFunctions.FetchFlowFiles<FC> fetchFlowFiles = PartialFunctions.fetchSingleFlowFile(); + protected PutFlowFile<FC, C> putFlowFile; + protected PartialFunctions.TransferFlowFiles<FC> transferFlowFiles = PartialFunctions.transferRoutedFlowFiles(); + protected PartialFunctions.AdjustRoute<FC> adjustRoute; + protected PartialFunctions.OnCompleted<FC, C> onCompleted; + protected PartialFunctions.OnFailed<FC, C> onFailed; + protected PartialFunctions.Cleanup<FC, C> cleanup; + protected ComponentLog logger; + + /** + * Put fetched FlowFiles to a data storage. + * @param context process context passed from a Processor onTrigger. + * @param session process session passed from a Processor onTrigger. + * @param functionContext function context passed from a Processor onTrigger. + * @param connection connection to data storage, established by {@link PartialFunctions.InitConnection}. + * @param flowFiles FlowFiles fetched from {@link PartialFunctions.FetchFlowFiles}. + * @param result Route incoming FlowFiles if necessary. + */ + protected void putFlowFiles(ProcessContext context, ProcessSession session, + FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException { + for (FlowFile flowFile : flowFiles) { + putFlowFile.apply(context, session, functionContext, connection, flowFile, result); + } + } + + protected void validateCompositePattern() { + Objects.requireNonNull(initConnection, "InitConnection function is required."); + Objects.requireNonNull(putFlowFile, "PutFlowFile function is required."); + Objects.requireNonNull(transferFlowFiles, "TransferFlowFiles function is required."); + } + + /** + * <p>Processor using this pattern is expected to call this method from its onTrigger. + * <p>Typical usage would be constructing a process pattern instance at a processor method + * which is annotated with {@link org.apache.nifi.annotation.lifecycle.OnScheduled}, + * and use pattern.onTrigger from processor.onTrigger. + * <p>{@link PartialFunctions.InitConnection} is required at least. In addition to any functions required by an implementation class. + * @param context process context passed from a Processor onTrigger. + * @param session process session passed from a Processor onTrigger. + * @param functionContext function context should be instantiated per onTrigger call. + * @throws ProcessException Each partial function can throw ProcessException if onTrigger should stop immediately. + */ + public void onTrigger(ProcessContext context, ProcessSession session, FC functionContext) throws ProcessException { + + validateCompositePattern(); + + final RoutingResult result = new RoutingResult(); + final List<FlowFile> flowFiles = fetchFlowFiles.apply(context, session, functionContext, result); + + // Transfer FlowFiles if there is any. + result.getRoutedFlowFiles().forEach(((relationship, routedFlowFiles) -> + session.transfer(routedFlowFiles, relationship))); + + if (flowFiles == null || flowFiles.isEmpty()) { + logger.debug("No incoming FlowFiles."); + return; + } + + try (C connection = initConnection.apply(context, session, functionContext)) { + + try { + // Execute the core function. + try { + putFlowFiles(context, session, functionContext, connection, flowFiles, result); + } catch (DiscontinuedException e) { + // Whether it was an error or semi normal is depends on the implementation and reason why it wanted to discontinue. + // So, no logging is needed here. + } + + // Extension point to alter routes. + if (adjustRoute != null) { + adjustRoute.apply(context, session, functionContext, result); + } + + // Put fetched, but unprocessed FlowFiles back to self. + final List<FlowFile> transferredFlowFiles = result.getRoutedFlowFiles().values().stream() + .flatMap(List::stream).collect(Collectors.toList()); + final List<FlowFile> unprocessedFlowFiles = flowFiles.stream() + .filter(flowFile -> !transferredFlowFiles.contains(flowFile)).collect(Collectors.toList()); + result.routeTo(unprocessedFlowFiles, Relationship.SELF); + + // OnCompleted processing. + if (onCompleted != null) { + onCompleted.apply(context, session, functionContext, connection); + } + + // Transfer FlowFiles. + transferFlowFiles.apply(context, session, functionContext, result); + + } catch (Exception e) { + if (onFailed != null) { + onFailed.apply(context, session, functionContext, connection, e); + } + throw e; + } finally { + if (cleanup != null) { + cleanup.apply(context, session, functionContext, connection); + } + } + + } catch (ProcessException e) { + throw e; + } catch (Exception e) { + // Throw uncaught exception as RuntimeException so that this processor will be yielded. + final String msg = String.format("Failed to execute due to %s", e); + logger.error(msg, e); + throw new RuntimeException(msg, e); + } + + } + + /** + * Specify an optional function that fetches incoming FlowFIles. + * If not specified, single FlowFile is fetched on each onTrigger. + * @param f Function to fetch incoming FlowFiles. + */ + public void fetchFlowFiles(PartialFunctions.FetchFlowFiles<FC> f) { + fetchFlowFiles = f; + } + + /** + * Specify a function that establishes a connection to target data storage. + * This function will be called when there is valid incoming FlowFiles. + * The created connection instance is automatically closed when onTrigger is finished. + * @param f Function to initiate a connection to a data storage. + */ + public void initConnection(PartialFunctions.InitConnection<FC, C> f) { + initConnection = f; + } + + /** + * Specify a function that puts an incoming FlowFile to target data storage. + * @param f a function to put a FlowFile to target storage. + */ + public void putFlowFile(PutFlowFile<FC, C> f) { + this.putFlowFile = f; + } + + /** + * Specify an optional function that adjust routed FlowFiles before transfer it. + * @param f a function to adjust route. + */ + public void adjustRoute(PartialFunctions.AdjustRoute<FC> f) { + this.adjustRoute = f; + } + + /** + * Specify an optional function responsible for transferring routed FlowFiles. + * If not specified routed FlowFiles are simply transferred to its destination by default. + * @param f a function to transfer routed FlowFiles. + */ + public void transferFlowFiles(PartialFunctions.TransferFlowFiles<FC> f) { + this.transferFlowFiles = f; + } + + /** + * Specify an optional function which will be called if input FlowFiles were successfully put to a target storage. + * @param f Function to be called when a put operation finishes successfully. + */ + public void onCompleted(PartialFunctions.OnCompleted<FC, C> f) { + onCompleted = f; + } + + /** + * Specify an optional function which will be called if input FlowFiles failed being put to a target storage. + * @param f Function to be called when a put operation failed. + */ + public void onFailed(PartialFunctions.OnFailed<FC, C> f) { + onFailed = f; + } + + /** + * Specify an optional function which will be called in a finally block. + * Typically useful when a special cleanup operation is needed for the connection. + * @param f Function to be called when a put operation finished regardless of whether it succeeded or not. + */ + public void cleanup(PartialFunctions.Cleanup<FC, C> f) { + cleanup = f; + } + + public void setLogger(ComponentLog logger) { + this.logger = logger; + } + + @FunctionalInterface + public interface PutFlowFile<FC, C> { + void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, + FlowFile flowFile, RoutingResult result) throws ProcessException; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java new file mode 100644 index 0000000..6e9da2e --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/PutGroup.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; + +import java.util.List; +import java.util.Objects; + +/** + * Extended Put pattern capable of handling FlowFile groups. + * @param <FC> Function context class. + * @param <C> Connection class. + * @param <FFG> FlowFileGroup class. + */ +public class PutGroup<FC, C extends AutoCloseable, FFG extends PartialFunctions.FlowFileGroup> extends Put<FC, C> { + + + public PutGroup() { + // Just to make a composition valid. + this.putFlowFile = (context, session, functionContext, connection, inputFlowFile, result) -> { + throw new UnsupportedOperationException(); + }; + } + + @FunctionalInterface + public interface PutFlowFiles<FC, C, FFG> { + void apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, + FFG inputFlowFileGroup, RoutingResult result) throws ProcessException; + } + + @Override + protected void validateCompositePattern() { + super.validateCompositePattern(); + Objects.requireNonNull(groupFlowFiles, "GroupFlowFiles function is required."); + } + + /** + * PutGroup does not support PutFileFile function for single FlowFile. + * Throws UnsupportedOperationException if called. + */ + @Override + public void putFlowFile(PutFlowFile<FC, C> putFlowFile) { + throw new UnsupportedOperationException("PutFlowFile can not be used with PutGroup pattern. Specify PutFlowFiles instead."); + } + + @FunctionalInterface + public interface GroupFlowFiles<FC, C, FFG> { + List<FFG> apply(ProcessContext context, ProcessSession session, FC functionContext, C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException; + } + + private GroupFlowFiles<FC, C, FFG> groupFlowFiles; + private PutFlowFiles<FC, C, FFG> putFlowFiles; + + /** + * Specify a function that groups input FlowFiles into FlowFile groups. + */ + public void groupFetchedFlowFiles(GroupFlowFiles<FC, C, FFG> f) { + groupFlowFiles = f; + } + + /** + * Specify a function that puts an input FlowFile group to a target storage using a given connection. + */ + public void putFlowFiles(PutFlowFiles<FC, C, FFG> f) { + putFlowFiles = f; + } + + + @Override + protected void putFlowFiles(ProcessContext context, ProcessSession session, FC functionContext, + C connection, List<FlowFile> flowFiles, RoutingResult result) throws ProcessException { + final List<FFG> flowFileGroups = groupFlowFiles + .apply(context, session, functionContext, connection, flowFiles, result); + + for (FFG group : flowFileGroups) { + putFlowFiles.apply(context, session, functionContext, connection, group, result); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java new file mode 100644 index 0000000..2d4d768 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RollbackOnFailure.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processor.util.pattern.PartialFunctions.AdjustRoute; + +import java.util.List; +import java.util.Map; +import java.util.function.BiFunction; + +/** + * <p>RollbackOnFailure can be used as a function context for process patterns such as {@link Put} to provide a configurable error handling. + * + * <p> + * RollbackOnFailure can add following characteristics to a processor: + * <li>When disabled, input FlowFiles caused an error will be routed to 'failure' or 'retry' relationship, based on the type of error.</li> + * <li>When enabled, input FlowFiles are kept in the input queue. A ProcessException is thrown to rollback the process session.</li> + * <li>It assumes anything happened during a processors onTrigger can rollback, if this is marked as transactional.</li> + * <li>If transactional and enabled, even if some FlowFiles are already processed, it rollbacks the session when error occurs.</li> + * <li>If not transactional and enabled, it only rollbacks the session when error occurs only if there was no progress.</li> + * </p> + * + * <p>There are two approaches to apply RollbackOnFailure. One is using {@link ExceptionHandler#adjustError(BiFunction)}, + * and the other is implementing processor onTrigger using process patterns such as {@link Put#adjustRoute(AdjustRoute)}. </p> + * + * <p>It's also possible to use both approaches. ExceptionHandler can apply when an Exception is thrown immediately, while AdjustRoute respond later but requires less code.</p> + */ +public class RollbackOnFailure { + + private final boolean rollbackOnFailure; + private final boolean transactional; + private boolean discontinue; + + private int processedCount = 0; + + /** + * Constructor. + * @param rollbackOnFailure Should be set by user via processor configuration. + * @param transactional Specify whether a processor is transactional. + * If not, it is important to call {@link #proceed()} after successful execution of processors task, + * that indicates processor made an operation that can not be undone. + */ + public RollbackOnFailure(boolean rollbackOnFailure, boolean transactional) { + this.rollbackOnFailure = rollbackOnFailure; + this.transactional = transactional; + } + + public static final PropertyDescriptor ROLLBACK_ON_FAILURE = createRollbackOnFailureProperty(""); + + public static PropertyDescriptor createRollbackOnFailureProperty(String additionalDescription) { + return new PropertyDescriptor.Builder() + .name("rollback-on-failure") + .displayName("Rollback On Failure") + .description("Specify how to handle error." + + " By default (false), if an error occurs while processing a FlowFile, the FlowFile will be routed to" + + " 'failure' or 'retry' relationship based on error type, and processor can continue with next FlowFile." + + " Instead, you may want to rollback currently processed FlowFiles and stop further processing immediately." + + " In that case, you can do so by enabling this 'Rollback On Failure' property. " + + " If enabled, failed FlowFiles will stay in the input relationship without penalizing it and being processed repeatedly" + + " until it gets processed successfully or removed by other means." + + " It is important to set adequate 'Yield Duration' to avoid retrying too frequently." + additionalDescription) + .allowableValues("true", "false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .defaultValue("false") + .required(true) + .build(); + } + + /** + * Create a function to use with {@link ExceptionHandler} that adjust error type based on functional context. + */ + public static <FCT extends RollbackOnFailure> BiFunction<FCT, ErrorTypes, ErrorTypes.Result> createAdjustError(final ComponentLog logger) { + return (c, t) -> { + + ErrorTypes.Result adjusted = null; + switch (t.destination()) { + + case ProcessException: + // If this process can rollback, then rollback it. + if (!c.canRollback()) { + // If an exception is thrown but the processor is not transactional and processed count > 0, adjust it to self, + // in order to stop any further processing until this input is processed successfully. + // If we throw an Exception in this state, the already succeeded FlowFiles will be rolled back, too. + // In case the progress was made by other preceding inputs, + // those successful inputs should be sent to 'success' and this input stays in incoming queue. + // In case this input made some progress to external system, the partial update will be replayed again, + // can cause duplicated data. + c.discontinue(); + // We should not penalize a FlowFile, if we did, other FlowFiles can be fetched first. + // We need to block others to be processed until this one finishes. + adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield); + } + break; + + case Failure: + case Retry: + if (c.isRollbackOnFailure()) { + c.discontinue(); + if (c.canRollback()) { + // If this process can rollback, then throw ProcessException instead, in order to rollback. + adjusted = new ErrorTypes.Result(ErrorTypes.Destination.ProcessException, ErrorTypes.Penalty.Yield); + } else { + // If not, + adjusted = new ErrorTypes.Result(ErrorTypes.Destination.Self, ErrorTypes.Penalty.Yield); + } + } + break; + } + + if (adjusted != null) { + if (logger.isDebugEnabled()) { + logger.debug("Adjusted {} to {} based on context rollbackOnFailure={}, processedCount={}, transactional={}", + new Object[]{t, adjusted, c.isRollbackOnFailure(), c.getProcessedCount(), c.isTransactional()}); + } + return adjusted; + } + + return t.result(); + }; + } + + /** + * Create an {@link AdjustRoute} function to use with process pattern such as {@link Put} that adjust routed FlowFiles based on context. + * This function works as a safety net by covering cases that Processor implementation did not use ExceptionHandler and transfer FlowFiles + * without considering RollbackOnFailure context. + */ + public static <FCT extends RollbackOnFailure> AdjustRoute<FCT> createAdjustRoute(Relationship ... failureRelationships) { + return (context, session, fc, result) -> { + if (fc.isRollbackOnFailure()) { + // Check if route contains failure relationship. + for (Relationship failureRelationship : failureRelationships) { + if (!result.contains(failureRelationship)) { + continue; + } + if (fc.canRollback()) { + throw new ProcessException(String.format( + "A FlowFile is routed to %s. Rollback session based on context rollbackOnFailure=%s, processedCount=%d, transactional=%s", + failureRelationship.getName(), fc.isRollbackOnFailure(), fc.getProcessedCount(), fc.isTransactional())); + } else { + // Send failed FlowFiles to self. + final Map<Relationship, List<FlowFile>> routedFlowFiles = result.getRoutedFlowFiles(); + final List<FlowFile> failedFlowFiles = routedFlowFiles.remove(failureRelationship); + result.routeTo(failedFlowFiles, Relationship.SELF); + } + } + } + }; + } + + public static <FCT extends RollbackOnFailure, I> ExceptionHandler.OnError<FCT, I> createOnError(ExceptionHandler.OnError<FCT, I> onError) { + return onError.andThen((context, input, result, e) -> { + if (context.shouldDiscontinue()) { + throw new DiscontinuedException("Discontinue processing due to " + e, e); + } + }); + } + + public static <FCT extends RollbackOnFailure> void onTrigger( + ProcessContext context, ProcessSessionFactory sessionFactory, FCT functionContext, ComponentLog logger, + PartialFunctions.OnTrigger onTrigger) throws ProcessException { + + PartialFunctions.onTrigger(context, sessionFactory, logger, onTrigger, (session, t) -> { + // If RollbackOnFailure is enabled, do not penalize processing FlowFiles when rollback, + // in order to keep those in the incoming relationship to be processed again. + final boolean shouldPenalize = !functionContext.isRollbackOnFailure(); + session.rollback(shouldPenalize); + + // However, keeping failed FlowFile in the incoming relationship would retry it too often. + // So, administratively yield the process. + if (functionContext.isRollbackOnFailure()) { + logger.warn("Administratively yielding {} after rolling back due to {}", new Object[]{context.getName(), t}, t); + context.yield(); + } + }); + } + + public int proceed() { + return ++processedCount; + } + + public int getProcessedCount() { + return processedCount; + } + + public boolean isRollbackOnFailure() { + return rollbackOnFailure; + } + + public boolean isTransactional() { + return transactional; + } + + public boolean canRollback() { + return transactional || processedCount == 0; + } + + public boolean shouldDiscontinue() { + return discontinue; + } + + public void discontinue() { + this.discontinue = true; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java new file mode 100644 index 0000000..200d893 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/main/java/org/apache/nifi/processor/util/pattern/RoutingResult.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.Relationship; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class RoutingResult { + + private final Map<Relationship, List<FlowFile>> routedFlowFiles = new HashMap<>(); + + public void routeTo(final FlowFile flowFile, final Relationship relationship) { + routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).add(flowFile); + } + + public void routeTo(final List<FlowFile> flowFiles, final Relationship relationship) { + routedFlowFiles.computeIfAbsent(relationship, r -> new ArrayList<>()).addAll(flowFiles); + } + + public void merge(final RoutingResult r) { + r.getRoutedFlowFiles().forEach((relationship, routedFlowFiles) -> routeTo(routedFlowFiles, relationship)); + } + + public Map<Relationship, List<FlowFile>> getRoutedFlowFiles() { + return routedFlowFiles; + } + + public boolean contains(Relationship relationship) { + return routedFlowFiles.containsKey(relationship) && !routedFlowFiles.get(relationship).isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java new file mode 100644 index 0000000..bd73379 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestExceptionHandler.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import org.apache.nifi.processor.exception.ProcessException; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestExceptionHandler { + + private static final Logger logger = LoggerFactory.getLogger(TestExceptionHandler.class); + + /** + * Simulate an external procedure. + */ + static class ExternalProcedure { + private boolean available = true; + int divide(Integer a, Integer b) throws Exception { + if (!available) { + throw new IOException("Not available"); + } + if (a == 10) { + throw new IllegalStateException("Service for 10 is not currently available."); + } + return a / b; + } + } + + private class Context { + int count = 0; + } + + @Test + public void testBasicUsage() { + + final ExternalProcedure p = new ExternalProcedure(); + + try { + // Although a catch-exception has to be caught each possible call, + // usually the error handling logic will be the same. + // Ends up having a lot of same code. + final int r1 = p.divide(4, 2); + assertEquals(2, r1); + } catch (Exception e) { + e.printStackTrace(); + } + + final Context context = new Context(); + final ExceptionHandler<Context> handler = new ExceptionHandler<>(); + + // Using handler can avoid the try catch block with reusable error handling logic. + handler.execute(context, 6, i -> { + final int r2 = p.divide(i, 2); + assertEquals(3, r2); + }); + + // If return value is needed, use AtomicReference. + AtomicReference<Integer> r = new AtomicReference<>(); + handler.execute(context, 8, i -> r.set(p.divide(i, 2))); + assertEquals(4, r.get().intValue()); + + // If no exception mapping is specified, any Exception thrown is wrapped by ProcessException. + try { + final Integer nullInput = null; + handler.execute(context, nullInput, i -> r.set(p.divide(i, 2))); + fail("Exception should be thrown because input is null."); + } catch (ProcessException e) { + assertTrue(e.getCause() instanceof NullPointerException); + } + } + + // Reusable Exception mapping function. + static Function<Exception, ErrorTypes> exceptionMapping = i -> { + try { + throw i; + } catch (NullPointerException | ArithmeticException | NumberFormatException e) { + return ErrorTypes.InvalidInput; + } catch (IllegalStateException e) { + return ErrorTypes.TemporalInputFailure; + } catch (IOException e) { + return ErrorTypes.TemporalFailure; + } catch (Exception e) { + throw new ProcessException(e); + } + }; + + @Test + public void testHandling() { + + final ExternalProcedure p = new ExternalProcedure(); + final Context context = new Context(); + + final ExceptionHandler<Context> handler = new ExceptionHandler<>(); + handler.mapException(exceptionMapping); + handler.onError(createInputErrorHandler()); + + // Benefit of handler is being able to externalize error handling, make it simpler. + handler.execute(context, 4, i -> { + final int r = p.divide(i, 2); + assertEquals(2, r); + }); + + // Null pointer exception. + final Integer input = null; + handler.execute(context, input, i -> { + p.divide(i, 2); + fail("Shouldn't reach here."); + }); + + // Divide by zero. + handler.execute(context, 0, i -> { + p.divide(2, i); + fail("Shouldn't reach here."); + }); + + + } + + static <C> ExceptionHandler.OnError<C, Integer> createInputErrorHandler() { + return (c, i, r, e) -> { + switch (r.destination()) { + case ProcessException: + throw new ProcessException(String.format("Execution failed due to %s", e), e); + default: + logger.warn(String.format("Routing to %s: %d caused %s", r, i, e)); + } + }; + } + + static <C> ExceptionHandler.OnError<C, Integer[]> createArrayInputErrorHandler() { + return (c, i, r, e) -> { + switch (r.destination()) { + case ProcessException: + throw new ProcessException(String.format("Execution failed due to %s", e), e); + default: + logger.warn(String.format("Routing to %s: %d, %d caused %s", r, i[0], i[1], e)); + } + }; + } + + @Test + public void testHandlingLoop() { + + final ExternalProcedure p = new ExternalProcedure(); + final Context context = new Context(); + + final ExceptionHandler<Context> handler = new ExceptionHandler<>(); + handler.mapException(exceptionMapping); + handler.onError(createArrayInputErrorHandler()); + + // It's especially handy when looping through inputs. [a, b, expected result] + Integer[][] inputs = new Integer[][]{{4, 2, 2}, {null, 2, 999}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}}; + + Arrays.stream(inputs).forEach(input -> handler.execute(context, input, (in) -> { + final Integer r = p.divide(in[0], in[1]); + // This is safe because if p.divide throws error, this code won't be executed. + assertEquals(in[2], r); + })); + + AtomicReference<Integer> r = new AtomicReference<>(); + for (Integer[] input : inputs) { + + if (!handler.execute(context, input, (in) -> { + r.set(p.divide(in[0], in[1])); + context.count++; + })){ + // Handler returns false when it fails. + // Cleaner if-exception-continue-next-input can be written cleaner. + continue; + } + + assertEquals(input[2], r.get()); + } + + assertEquals("Successful inputs", 2, context.count); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java new file mode 100644 index 0000000..6d73759 --- /dev/null +++ b/nifi-commons/nifi-processor-utilities/src/test/java/org/apache/nifi/processor/util/pattern/TestRollbackOnFailure.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processor.util.pattern; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.pattern.TestExceptionHandler.ExternalProcedure; +import org.apache.nifi.util.MockComponentLog; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.createArrayInputErrorHandler; +import static org.apache.nifi.processor.util.pattern.TestExceptionHandler.exceptionMapping; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class TestRollbackOnFailure { + + private static final Logger logger = LoggerFactory.getLogger(TestRollbackOnFailure.class); + + /** + * This can be an example for how to compose an ExceptionHandler instance by reusable functions. + * @param logger used to log messages within functions + * @return a composed ExceptionHandler + */ + private ExceptionHandler<RollbackOnFailure> getContextAwareExceptionHandler(ComponentLog logger) { + final ExceptionHandler<RollbackOnFailure> handler = new ExceptionHandler<>(); + handler.mapException(exceptionMapping); + handler.adjustError(RollbackOnFailure.createAdjustError(logger)); + handler.onError(createArrayInputErrorHandler()); + return handler; + } + + private void processInputs(RollbackOnFailure context, Integer[][] inputs, List<Integer> results) { + final ExternalProcedure p = new ExternalProcedure(); + final MockComponentLog componentLog = new MockComponentLog("processor-id", this); + final ExceptionHandler<RollbackOnFailure> handler = getContextAwareExceptionHandler(componentLog); + + for (Integer[] input : inputs) { + + if (!handler.execute(context, input, (in) -> { + results.add(p.divide(in[0], in[1])); + context.proceed(); + })){ + continue; + } + + assertEquals(input[2], results.get(results.size() - 1)); + } + } + + @Test + public void testContextDefaultBehavior() { + + // Disabling rollbackOnFailure would route Failure or Retry as they are. + final RollbackOnFailure context = new RollbackOnFailure(false, false); + + Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}}; + + final List<Integer> results = new ArrayList<>(); + try { + processInputs(context, inputs, results); + } catch (ProcessException e) { + fail("ProcessException should NOT be thrown"); + } + + assertEquals("Successful inputs", 2, context.getProcessedCount()); + } + + @Test + public void testContextRollbackOnFailureNonTransactionalFirstFailure() { + + final RollbackOnFailure context = new RollbackOnFailure(true, false); + + // If the first execution fails without any succeeded inputs, it should throw a ProcessException. + Integer[][] inputs = new Integer[][]{{null, 2, 999}, {4, 2, 2}, {2, 0, 999}, {10, 2, 999}, {8, 2, 4}}; + + final List<Integer> results = new ArrayList<>(); + try { + processInputs(context, inputs, results); + fail("ProcessException should be thrown"); + } catch (ProcessException e) { + logger.info("Exception was thrown as expected."); + } + + assertEquals("Successful inputs", 0, context.getProcessedCount()); + } + + @Test + public void testContextRollbackOnFailureNonTransactionalAlreadySucceeded() { + + final RollbackOnFailure context = new RollbackOnFailure(true, false); + + // If an execution fails after succeeded inputs, it transfer the input to Failure instead of ProcessException, + // and keep going. Because the external system does not support transaction. + Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}}; + + final List<Integer> results = new ArrayList<>(); + try { + processInputs(context, inputs, results); + } catch (ProcessException e) { + fail("ProcessException should NOT be thrown"); + } + + assertEquals("Successful inputs", 2, context.getProcessedCount()); + } + + @Test + public void testContextRollbackOnFailureTransactionalAlreadySucceeded() { + + final RollbackOnFailure context = new RollbackOnFailure(true, true); + + // Even if an execution fails after succeeded inputs, it transfer the input to Failure, + // because the external system supports transaction. + Integer[][] inputs = new Integer[][]{{4, 2, 2}, {2, 0, 999}, {null, 2, 999}, {10, 2, 999}, {8, 2, 4}}; + + final List<Integer> results = new ArrayList<>(); + try { + processInputs(context, inputs, results); + fail("ProcessException should be thrown"); + } catch (ProcessException e) { + logger.info("Exception was thrown as expected."); + } + + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml index ea6e5df..661180e 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/pom.xml @@ -32,6 +32,10 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-processor-utils</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-dbcp-service-api</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/d9acdb54/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java index 3835ff7..1a2110a 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/AbstractHiveQLProcessor.java @@ -19,9 +19,8 @@ package org.apache.nifi.processors.hive; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.stream.io.StreamUtils; @@ -30,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.math.BigDecimal; import java.nio.charset.Charset; +import java.sql.SQLDataException; import java.sql.Time; import java.sql.Timestamp; import java.sql.Date; @@ -45,7 +45,7 @@ import java.util.regex.Pattern; /** * An abstract base class for HiveQL processors to share common data, methods, etc. */ -public abstract class AbstractHiveQLProcessor extends AbstractProcessor { +public abstract class AbstractHiveQLProcessor extends AbstractSessionFactoryProcessor { protected static final Pattern HIVEQL_TYPE_ATTRIBUTE_PATTERN = Pattern.compile("hiveql\\.args\\.(\\d+)\\.type"); protected static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); @@ -112,7 +112,7 @@ public abstract class AbstractHiveQLProcessor extends AbstractProcessor { if (parameterIndex >= base && parameterIndex < base + paramCount) { final boolean isNumeric = NUMBER_PATTERN.matcher(entry.getValue()).matches(); if (!isNumeric) { - throw new ProcessException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral jdbcType"); + throw new SQLDataException("Value of the " + key + " attribute is '" + entry.getValue() + "', which is not a valid JDBC numeral jdbcType"); } final String valueAttrName = "hiveql.args." + parameterIndex + ".value"; @@ -139,7 +139,7 @@ public abstract class AbstractHiveQLProcessor extends AbstractProcessor { try { setParameter(stmt, ph.attributeName, index, ph.value, ph.jdbcType); } catch (final NumberFormatException nfe) { - throw new ProcessException("The value of the " + ph.attributeName + " is '" + ph.value + "', which cannot be converted into the necessary data jdbcType", nfe); + throw new SQLDataException("The value of the " + ph.attributeName + " is '" + ph.value + "', which cannot be converted into the necessary data jdbcType", nfe); } } return base + paramCount;
