NIFI-6: Added annotations with new package names to be more explicit; deprecated all old annotations; updated framework to use new annotations and old
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/1c0eb6c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/1c0eb6c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/1c0eb6c6 Branch: refs/heads/NIFI-250 Commit: 1c0eb6c66e297778df04fc7affbfe282068e18cf Parents: 73384b2 Author: Mark Payne <[email protected]> Authored: Thu Jan 15 15:14:49 2015 -0500 Committer: Mark Payne <[email protected]> Committed: Fri Jan 16 12:05:23 2015 -0500 ---------------------------------------------------------------------- .../nifi/annotation/behavior/EventDriven.java | 49 ++++ .../annotation/behavior/SideEffectFree.java | 47 ++++ .../annotation/behavior/SupportsBatching.java | 52 +++++ .../annotation/behavior/TriggerSerially.java | 40 ++++ .../TriggerWhenAnyDestinationAvailable.java | 42 ++++ .../annotation/behavior/TriggerWhenEmpty.java | 42 ++++ .../documentation/CapabilityDescription.java | 41 ++++ .../nifi/annotation/documentation/Tags.java | 46 ++++ .../nifi/annotation/lifecycle/OnAdded.java | 44 ++++ .../nifi/annotation/lifecycle/OnRemoved.java | 43 ++++ .../nifi/annotation/lifecycle/OnScheduled.java | 58 +++++ .../nifi/annotation/lifecycle/OnShutdown.java | 42 ++++ .../nifi/annotation/lifecycle/OnStopped.java | 59 +++++ .../annotation/lifecycle/OnUnscheduled.java | 44 ++++ .../apache/nifi/controller/FlowController.java | 5 +- .../nifi/controller/StandardProcessorNode.java | 46 ++-- .../scheduling/EventDrivenSchedulingAgent.java | 6 +- .../scheduling/StandardProcessScheduler.java | 15 +- .../tasks/ContinuallyRunConnectableTask.java | 6 +- .../tasks/ContinuallyRunProcessorTask.java | 6 +- .../controller/tasks/ReportingTaskWrapper.java | 6 +- .../nifi/groups/StandardProcessGroup.java | 11 +- .../org/apache/nifi/util/ReflectionUtils.java | 230 +++++++++++++------ .../StubAttributeLoggerProcessor.java | 2 +- .../org/apache/nifi/web/api/dto/DtoFactory.java | 26 ++- .../hadoop/AbstractHadoopProcessor.java | 5 +- .../hadoop/CreateHadoopSequenceFile.java | 7 +- .../apache/nifi/processors/hadoop/GetHDFS.java | 9 +- .../processors/hadoop/GetHDFSSequenceFile.java | 7 +- .../apache/nifi/processors/hadoop/PutHDFS.java | 6 +- .../apache/nifi/processors/kafka/GetKafka.java | 12 +- .../apache/nifi/processors/kafka/PutKafka.java | 8 +- .../nifi/processors/kafka/TestPutKafka.java | 2 +- .../processors/monitor/MonitorThreshold.java | 8 +- .../apache/nifi/processors/jms/GetJMSQueue.java | 8 +- .../apache/nifi/processors/jms/GetJMSTopic.java | 14 +- .../org/apache/nifi/processors/jms/PutJMS.java | 6 +- .../standard/Base64EncodeContent.java | 41 ++-- .../processors/standard/CompressContent.java | 10 +- .../nifi/processors/standard/ControlRate.java | 8 +- .../standard/ConvertCharacterSet.java | 10 +- .../processors/standard/DetectDuplicate.java | 8 +- .../processors/standard/DistributeLoad.java | 14 +- .../processors/standard/EncryptContent.java | 10 +- .../standard/EvaluateRegularExpression.java | 10 +- .../nifi/processors/standard/EvaluateXPath.java | 12 +- .../processors/standard/EvaluateXQuery.java | 10 +- .../standard/ExecuteStreamCommand.java | 8 +- .../processors/standard/GenerateFlowFile.java | 8 +- .../apache/nifi/processors/standard/GetFTP.java | 6 +- .../nifi/processors/standard/GetFile.java | 8 +- .../processors/standard/GetFileTransfer.java | 2 +- .../nifi/processors/standard/GetHTTP.java | 6 +- .../nifi/processors/standard/GetSFTP.java | 6 +- .../nifi/processors/standard/HashAttribute.java | 10 +- .../nifi/processors/standard/HashContent.java | 8 +- .../processors/standard/IdentifyMimeType.java | 10 +- .../nifi/processors/standard/InvokeHTTP.java | 6 +- .../nifi/processors/standard/ListenHTTP.java | 8 +- .../nifi/processors/standard/ListenUDP.java | 12 +- .../nifi/processors/standard/LogAttribute.java | 8 +- .../nifi/processors/standard/MergeContent.java | 12 +- .../nifi/processors/standard/ModifyBytes.java | 8 +- .../processors/standard/MonitorActivity.java | 10 +- .../nifi/processors/standard/PostHTTP.java | 10 +- .../nifi/processors/standard/PutEmail.java | 6 +- .../apache/nifi/processors/standard/PutFTP.java | 8 +- .../nifi/processors/standard/PutFile.java | 6 +- .../nifi/processors/standard/PutSFTP.java | 6 +- .../nifi/processors/standard/ReplaceText.java | 10 +- .../standard/ReplaceTextWithMapping.java | 10 +- .../processors/standard/RouteOnAttribute.java | 10 +- .../processors/standard/RouteOnContent.java | 10 +- .../nifi/processors/standard/ScanAttribute.java | 12 +- .../nifi/processors/standard/ScanContent.java | 10 +- .../processors/standard/SegmentContent.java | 10 +- .../nifi/processors/standard/SplitContent.java | 10 +- .../nifi/processors/standard/SplitText.java | 10 +- .../nifi/processors/standard/SplitXml.java | 10 +- .../nifi/processors/standard/TransformXml.java | 10 +- .../nifi/processors/standard/UnpackContent.java | 10 +- .../nifi/processors/standard/ValidateXml.java | 12 +- .../standard/RESTServiceContentModified.java | 2 +- .../standard/TestCompressContent.java | 5 +- .../standard/TestConvertCharacterSet.java | 3 - .../standard/TestDetectDuplicate.java | 4 +- .../nifi/processors/standard/TestGetHTTP.java | 1 - .../cache/server/DistributedCacheServer.java | 2 +- .../processors/attributes/UpdateAttribute.java | 11 +- .../AbstractSessionFactoryProcessor.java | 4 +- .../annotation/CapabilityDescription.java | 3 + .../nifi/processor/annotation/EventDriven.java | 3 + .../nifi/processor/annotation/OnAdded.java | 2 + .../nifi/processor/annotation/OnRemoved.java | 2 + .../nifi/processor/annotation/OnScheduled.java | 4 +- .../nifi/processor/annotation/OnShutdown.java | 2 + .../nifi/processor/annotation/OnStopped.java | 2 + .../processor/annotation/OnUnscheduled.java | 2 + .../processor/annotation/SideEffectFree.java | 2 + .../processor/annotation/SupportsBatching.java | 2 + .../apache/nifi/processor/annotation/Tags.java | 3 + .../processor/annotation/TriggerSerially.java | 4 +- .../TriggerWhenAnyDestinationAvailable.java | 2 + .../processor/annotation/TriggerWhenEmpty.java | 2 + 104 files changed, 1195 insertions(+), 411 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/EventDriven.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/EventDriven.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/EventDriven.java new file mode 100644 index 0000000..279a49e --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/EventDriven.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * <p> + * Annotation that may be placed on a Processor that indicates to the framework + * that the Processor is eligible to be scheduled to run based on the occurrence + * of an "Event" (e.g., when a FlowFile is enqueued in an incoming Connection), + * rather than being triggered periodically. + * </p> + * + * <p> + * This Annotation should not be used in conjunction with + * {@link TriggerSerially} or {@link TriggerWhenEmpty}. If this Annotation is + * used with either of these other Annotations, the Processor will not be + * eligible to be scheduled in Event-Driven mode. + * </p> + * + * @author none + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface EventDriven { + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SideEffectFree.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SideEffectFree.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SideEffectFree.java new file mode 100644 index 0000000..f32acc3 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SideEffectFree.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor} + * implementation can use to indicate that its + * operations on FlowFiles can be safely repeated across process sessions. If a + * processor has this annotation and it allows the framework to manage session + * commit and rollback then the framework may elect to cascade a + * {@link org.apache.nifi.processor.ProcessSession ProcessSession} given to this + * processor's onTrigger method to the + * onTrigger method of another processor. It can do this knowing that if + * something fails along a series of processors using this same session that it + * can all be safely rolled back without any ill effects on external services + * which could not be rolled back and thus all the processes could be safely + * repeated (implied idempotent behavior). + * + * @author none + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface SideEffectFree { +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SupportsBatching.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SupportsBatching.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SupportsBatching.java new file mode 100644 index 0000000..f5fd61f --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/SupportsBatching.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a Processor implementation can use to indicate that users + * should be able to supply a Batch Duration for the Processor. If a Processor + * uses this annotation, it is allowing the Framework to batch + * {@link nifi.processor.ProcessSession ProcessSession}s' commits, as well as + * allowing the Framework to return the same ProcessSession multiple times from + * subsequent calls to + * {@link nifi.processor.ProcessSessionFactory ProcessSessionFactory}. + * {@link nifi.processor.ProcessSessionFactory#createSession() createSession()}. + * + * When this Annotation is used, it is important to note that calls to + * {@link nifi.processor.ProcessSession#commit() ProcessSession.commit()} may + * not provide a guarantee that the data has been safely stored in NiFi's + * Content Repository or FlowFile Repository. Therefore, it is not appropriate, + * for instance, to use this annotation if the Processor will call + * ProcessSession.commit() to ensure data is persisted before deleting the data + * from a remote source. + * + * @author none + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface SupportsBatching { + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerSerially.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerSerially.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerSerially.java new file mode 100644 index 0000000..7bf7d0b --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerSerially.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor} + * implementation can use to indicate that the + * Processor is not safe for concurrent execution of its onTrigger() + * method. By default, Processors are assumed to be safe for concurrent + * execution. + * + * @author none + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface TriggerSerially { +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenAnyDestinationAvailable.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenAnyDestinationAvailable.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenAnyDestinationAvailable.java new file mode 100644 index 0000000..803aa2f --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenAnyDestinationAvailable.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor} + * implementation can use to indicate that the + * Processor is to be triggered if any of its destinations has available space + * for incoming FlowFiles. By default, Processors are triggered only when all + * destinations report that they have available space (i.e., none of the outgoing + * Connections is full). + * + * @author none + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface TriggerWhenAnyDestinationAvailable { + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java new file mode 100644 index 0000000..fed9b34 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/behavior/TriggerWhenEmpty.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.behavior; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor} + * implementation can use to indicate that the + * Processor should still be triggered even when it has no data in its work + * queue. By default, Processors which have no non-self incoming edges will be + * triggered even if there is no work in its queue. However, Processors that + * have non-self incoming edges will only be triggered if they have work in + * their queue or they present this annotation. + * + * @author none + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface TriggerWhenEmpty { +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/CapabilityDescription.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/CapabilityDescription.java b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/CapabilityDescription.java new file mode 100644 index 0000000..d69788a --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/CapabilityDescription.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.documentation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation that may be placed on a {@link org.apache.nifi.processor.Processor Processor}, + * {@link org.apache.nifi.controller.ControllerService ControllerService}, or + * {@link org.apache.nifi.reporting.ReportingTask ReportingTask} allowing for a description to be + * provided. This description can be provided to a user in logs, UI, etc. + * + * @author none + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface CapabilityDescription { + + String value(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/Tags.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/Tags.java b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/Tags.java new file mode 100644 index 0000000..8bd8f9a --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/documentation/Tags.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.documentation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation that can be applied to a {@link org.apache.nifi.processor.Processor Processor}, + * {@link org.apache.nifi.controller.ControllerService ControllerService}, or + * {@link org.apache.nifi.reporting.ReportingTask ReportingTask} in order to associate + * tags (keywords) with the component. These tags do not affect the component in + * any way but serve as additional documentation and can be used to sort/filter + * Processors. + * + * @author none + */ +@Documented +@Target({ElementType.TYPE}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface Tags { + + /** + * @return all tag values associated with the given processor + */ + public String[] value(); +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnAdded.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnAdded.java b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnAdded.java new file mode 100644 index 0000000..acb7a4d --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnAdded.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.lifecycle; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor}, + * {@link org.apache.nifi.controller.ControllerService ControllerService}, or + * {@link org.apache.nifi.reporting.ReportingTask ReportingTask} + * implementation can use to indicate a method + * should be called whenever the component is added to the flow. This method + * will be called once for the entire life of a component instance. + * + * If any method annotated with this annotation throws a Throwable, the component + * will not be added to the flow. + * + * @author none + */ +@Documented +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface OnAdded { +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnRemoved.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnRemoved.java b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnRemoved.java new file mode 100644 index 0000000..696159f --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnRemoved.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.lifecycle; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor}, + * {@link org.apache.nifi.controller.ControllerService ControllerService}, or + * {@link org.apache.nifi.reporting.ReportingTask ReportingTask} implementation + * can use to indicate a method should be called whenever the component is removed + * from the flow. This method will be called once for the entire life of a + * component instance. If the method throw any Throwable, that Throwable will be + * caught and logged but will not prevent subsequent methods with this annotation + * or removal of the component from the flow. + * + * @author none + */ +@Documented +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface OnRemoved { +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java new file mode 100644 index 0000000..9dfd150 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.lifecycle; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor} or + * {@link org.apache.nifi.reporting.ReportingTask ReportingTask} implementation + * can use to indicate a method should be called whenever the component is scheduled + * to run. This will be called before any call to 'onTrigger' and will be called once each time + * a Processor or Reporting Task is scheduled to run. + * + * <p> + * Methods using this annotation must take either 0 arguments or a single argument. + * </p> + * + * <p> + * If using 1 argument and the component using the annotation is a Processor, that argument must + * be of type {@link org.apache.nifi.processor.ProcessContext ProcessContext}. + * </p> + * + * <p> + * If using 1 argument and the component using the annotation is a Reporting Task, that argument must + * be of type {@link org.apache.nifi.reporting.ReportingContext ReportingContext}. + * </p> + * + * If any method annotated with this annotation throws any Throwable, the framework will wait a while + * and then attempt to invoke the method again. This will continue until the method succeeds, and the + * component will then be scheduled to run after this method return successfully. + * + * @author none + */ +@Documented +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface OnScheduled { +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnShutdown.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnShutdown.java b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnShutdown.java new file mode 100644 index 0000000..a4129e1 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnShutdown.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.lifecycle; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor}, + * {@link org.apache.nifi.controller.ControllerService ControllerService}, or + * {@link org.apache.nifi.reporting.ReportingTask ReportingTask} implementation + * can use to indicate a method should be called whenever the flow is being shutdown. + * This will be called at most once for each component in a JVM lifetime. + * It is not, however, guaranteed that this method will be called on shutdown, as + * the service may be killed suddenly. + * + * @author none + */ +@Documented +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface OnShutdown { +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnStopped.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnStopped.java b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnStopped.java new file mode 100644 index 0000000..4715253 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnStopped.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.lifecycle; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * <p> + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor} or + * {@link org.apache.nifi.reporting.ReportingTask ReportingTask} + * implementation can use to indicate that a method + * should be called whenever the component is no longer scheduled to run. + * Methods marked with this annotation will be invoked each time the component + * is stopped and will be invoked only after the last thread has returned from + * the <code>onTrigger</code> method. + * </p> + * + * <p> + * This means that the thread executing in this method will be the only thread + * executing in any part of the Processor. However, since other threads may + * later execute other parts of the code, member variables must still be + * protected appropriately. However, access to multiple variables need not be + * atomic. + * </p> + * + * <p> + * To indicate that a method should be called immediately when a component is no + * longer scheduled to run (as opposed to after all threads have returned from the + * <code>onTrigger</code> method), see the {@link OnUnscheduled} annotation. + * </p> + * + * @author none + */ +@Documented +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface OnStopped { + +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java new file mode 100644 index 0000000..68d0fe8 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.annotation.lifecycle; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * <p> + * Marker annotation a {@link org.apache.nifi.processor.Processor Processor} or + * {@link org.apache.nifi.reporting.ReportingTask ReportingTask} + * should be called whenever the component is no longer scheduled to run. + * Methods marked with this annotation will be invoked each time the framework + * is notified to stop scheduling the component. This method is invoked as other + * threads are potentially running. To invoke a method after all threads have + * finished processing, see the {@link OnStopped} annotation. + * </p> + * + * @author none + */ +@Documented +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +public @interface OnUnscheduled { +} http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java index 346e801..860ea2d 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -49,6 +49,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.net.ssl.SSLContext; import org.apache.nifi.admin.service.UserService; +import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.cluster.BulletinsPayload; import org.apache.nifi.cluster.HeartbeatPayload; import org.apache.nifi.cluster.protocol.DataFlow; @@ -142,7 +143,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessorInitializationContext; import org.apache.nifi.processor.StandardValidationContextFactory; -import org.apache.nifi.processor.annotation.OnAdded; import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.provenance.ProvenanceEventType; @@ -785,6 +785,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H * @throws ProcessorInstantiationException if the processor cannot be * instantiated for any reason */ + @SuppressWarnings("deprecation") public ProcessorNode createProcessor(final String type, String id, final boolean firstTimeAdded) throws ProcessorInstantiationException { id = id.intern(); final Processor processor = instantiateProcessor(type, id); @@ -796,7 +797,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H if ( firstTimeAdded ) { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, processor); + ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, org.apache.nifi.processor.annotation.OnAdded.class, processor); } catch (final Exception e) { logRepository.removeObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID); throw new ProcessorLifeCycleException("Failed to invoke @OnAdded methods of " + procNode.getProcessor(), e); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java index 071be4d..fe72ae4 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java @@ -16,9 +16,17 @@ */ package org.apache.nifi.controller; +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.TriggerSerially; +import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable; +import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; +import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ValidationContextFactory; import org.apache.nifi.controller.ProcessorNode; + import static java.util.Objects.requireNonNull; import java.util.ArrayList; @@ -53,16 +61,8 @@ import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.Processor; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.CapabilityDescription; -import org.apache.nifi.processor.annotation.EventDriven; -import org.apache.nifi.processor.annotation.SideEffectFree; -import org.apache.nifi.processor.annotation.SupportsBatching; -import org.apache.nifi.processor.annotation.TriggerSerially; -import org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable; -import org.apache.nifi.processor.annotation.TriggerWhenEmpty; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; - import org.apache.commons.lang3.builder.EqualsBuilder; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.quartz.CronExpression; @@ -119,6 +119,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable private SchedulingStrategy schedulingStrategy; // guarded by read/write lock + @SuppressWarnings("deprecation") StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory, final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) { super(processor, uuid, validationContextFactory, controllerServiceProvider); @@ -150,13 +151,14 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable isolated = new AtomicBoolean(false); penalizationPeriod = new AtomicReference<>(DEFAULT_PENALIZATION_PERIOD); - triggerWhenEmpty = processor.getClass().isAnnotationPresent(TriggerWhenEmpty.class); - sideEffectFree = processor.getClass().isAnnotationPresent(SideEffectFree.class); - batchSupported = processor.getClass().isAnnotationPresent(SupportsBatching.class); - triggeredSerially = processor.getClass().isAnnotationPresent(TriggerSerially.class); - triggerWhenAnyDestinationAvailable = processor.getClass().isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class); + final Class<?> procClass = processor.getClass(); + triggerWhenEmpty = procClass.isAnnotationPresent(TriggerWhenEmpty.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenEmpty.class); + sideEffectFree = procClass.isAnnotationPresent(SideEffectFree.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SideEffectFree.class); + batchSupported = procClass.isAnnotationPresent(SupportsBatching.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.SupportsBatching.class); + triggeredSerially = procClass.isAnnotationPresent(TriggerSerially.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerSerially.class); + triggerWhenAnyDestinationAvailable = procClass.isAnnotationPresent(TriggerWhenAnyDestinationAvailable.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.TriggerWhenAnyDestinationAvailable.class); this.validationContextFactory = validationContextFactory; - eventDrivenSupported = processor.getClass().isAnnotationPresent(EventDriven.class) && !triggeredSerially && !triggerWhenEmpty; + eventDrivenSupported = (procClass.isAnnotationPresent(EventDriven.class) || procClass.isAnnotationPresent(org.apache.nifi.processor.annotation.EventDriven.class) )&& !triggeredSerially && !triggerWhenEmpty; schedulingStrategy = SchedulingStrategy.TIMER_DRIVEN; } @@ -354,9 +356,21 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable * @return the value of the processor's {@link CapabilityDescription} * annotation, if one exists, else <code>null</code>. */ + @SuppressWarnings("deprecation") public String getProcessorDescription() { - final CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); - return (capDesc == null) ? null : capDesc.value(); + CapabilityDescription capDesc = processor.getClass().getAnnotation(CapabilityDescription.class); + String description = null; + if ( capDesc != null ) { + description = capDesc.value(); + } else { + final org.apache.nifi.processor.annotation.CapabilityDescription deprecatedCapDesc = + processor.getClass().getAnnotation(org.apache.nifi.processor.annotation.CapabilityDescription.class); + if ( deprecatedCapDesc != null ) { + description = deprecatedCapDesc.value(); + } + } + + return description; } @Override http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java index af801bb..7455bf8 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/EventDrivenSchedulingAgent.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.EventBasedWorker; import org.apache.nifi.controller.EventDrivenWorkerQueue; @@ -41,12 +42,10 @@ import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.processor.annotation.OnStopped; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.Connectables; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.ReflectionUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -262,6 +261,7 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } } + @SuppressWarnings("deprecation") private void trigger(final Connectable worker, final ScheduleState scheduleState, final ConnectableProcessContext processContext, final ProcessSessionFactory sessionFactory) { final int newThreadCount = scheduleState.incrementActiveThreadCount(); if (newThreadCount > worker.getMaxConcurrentTasks() && worker.getMaxConcurrentTasks() > 0) { @@ -294,7 +294,7 @@ public class EventDrivenSchedulingAgent implements SchedulingAgent { } finally { if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, worker, processContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, worker, processContext); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 7fc65f9..5950b4e 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -27,6 +27,9 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.Funnel; import org.apache.nifi.connectable.Port; @@ -47,15 +50,11 @@ import org.apache.nifi.processor.SchedulingContext; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.StandardSchedulingContext; -import org.apache.nifi.processor.annotation.OnScheduled; -import org.apache.nifi.processor.annotation.OnStopped; -import org.apache.nifi.processor.annotation.OnUnscheduled; import org.apache.nifi.reporting.ReportingTask; import org.apache.nifi.scheduling.SchedulingStrategy; import org.apache.nifi.util.FormatUtils; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -209,6 +208,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { scheduleState.setScheduled(false); final Runnable unscheduleReportingTaskRunnable = new Runnable() { + @SuppressWarnings("deprecation") @Override public void run() { final ConfigurationContext configurationContext = taskNode.getConfigurationContext(); @@ -216,7 +216,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { while (true) { try { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext); + ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext); } break; } catch (final InvocationTargetException ite) { @@ -241,7 +241,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { agent.unschedule(taskNode, scheduleState); if (scheduleState.getActiveThreadCount() == 0) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext); } } }; @@ -276,6 +276,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { } final Runnable startProcRunnable = new Runnable() { + @SuppressWarnings("deprecation") @Override public void run() { try (final NarCloseable x = NarCloseable.withNarLoader()) { @@ -297,7 +298,7 @@ public final class StandardProcessScheduler implements ProcessScheduler { } final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, controllerServiceProvider, procNode); - ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, procNode.getProcessor(), schedulingContext); + ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, procNode.getProcessor(), schedulingContext); getSchedulingAgent(procNode).schedule(procNode, scheduleState); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java index c04a04f..aca870b 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java @@ -18,6 +18,7 @@ package org.apache.nifi.controller.tasks; import java.util.concurrent.atomic.AtomicLong; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.controller.repository.StandardProcessSessionFactory; import org.apache.nifi.controller.scheduling.ConnectableProcessContext; @@ -26,11 +27,9 @@ import org.apache.nifi.controller.scheduling.ScheduleState; import org.apache.nifi.encrypt.StringEncryptor; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.annotation.OnStopped; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.Connectables; import org.apache.nifi.util.ReflectionUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +49,7 @@ public class ContinuallyRunConnectableTask implements Runnable { this.processContext = new ConnectableProcessContext(connectable, encryptor); } + @SuppressWarnings("deprecation") @Override public void run() { if (!scheduleState.isScheduled()) { @@ -86,7 +86,7 @@ public class ContinuallyRunConnectableTask implements Runnable { } finally { if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, connectable, processContext); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java index 65c375f..33bd327 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.controller.FlowController; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.repository.BatchingSessionFactory; @@ -36,11 +37,9 @@ import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.processor.ProcessSessionFactory; import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.processor.annotation.OnStopped; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.util.Connectables; import org.apache.nifi.util.ReflectionUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +68,7 @@ public class ContinuallyRunProcessorTask implements Runnable { this.processContext = new StandardProcessContext(procNode, flowController, encryptor); } + @SuppressWarnings("deprecation") @Override public void run() { // make sure processor is not yielded @@ -163,7 +163,7 @@ public class ContinuallyRunProcessorTask implements Runnable { // invoke the OnStopped methods if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext); flowController.heartbeat(); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java index 36aa9dd..9b70581 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java @@ -16,12 +16,11 @@ */ package org.apache.nifi.controller.tasks; +import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.controller.ReportingTaskNode; import org.apache.nifi.controller.scheduling.ScheduleState; import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.processor.annotation.OnStopped; import org.apache.nifi.util.ReflectionUtils; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +36,7 @@ public class ReportingTaskWrapper implements Runnable { this.scheduleState = scheduleState; } + @SuppressWarnings("deprecation") @Override public synchronized void run() { scheduleState.incrementActiveThreadCount(); @@ -52,7 +52,7 @@ public class ReportingTaskWrapper implements Runnable { // invoke the OnStopped methods if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, taskNode.getReportingTask(), taskNode.getReportingContext()); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, taskNode.getReportingTask(), taskNode.getReportingContext()); } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 8cff5dd..856ccc1 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import org.apache.nifi.annotation.lifecycle.OnRemoved; +import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.connectable.Connectable; import org.apache.nifi.connectable.ConnectableType; import org.apache.nifi.connectable.Connection; @@ -46,10 +48,7 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.nar.NarCloseable; -import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; -import org.apache.nifi.processor.annotation.OnRemoved; -import org.apache.nifi.processor.annotation.OnShutdown; import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.util.NiFiProperties; @@ -326,10 +325,11 @@ public final class StandardProcessGroup implements ProcessGroup { } } + @SuppressWarnings("deprecation") private void shutdown(final ProcessGroup procGroup) { for (final ProcessorNode node : procGroup.getProcessors()) { try (final NarCloseable x = NarCloseable.withNarLoader()) { - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, node.getProcessor()); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, node.getProcessor()); } } @@ -652,6 +652,7 @@ public final class StandardProcessGroup implements ProcessGroup { } } + @SuppressWarnings("deprecation") @Override public void removeProcessor(final ProcessorNode processor) { final String id = requireNonNull(processor).getIdentifier(); @@ -668,7 +669,7 @@ public final class StandardProcessGroup implements ProcessGroup { try (final NarCloseable x = NarCloseable.withNarLoader()) { final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor); - ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, org.apache.nifi.processor.annotation.OnRemoved.class, processor.getProcessor(), processContext); } catch (final Exception e) { throw new ProcessorLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java index e15e00a..f8e7da4 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java @@ -19,6 +19,9 @@ package org.apache.nifi.util; import java.lang.annotation.Annotation; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,54 +45,92 @@ public class ReflectionUtils { * @throws IllegalAccessException */ public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { - try { - for (final Method method : instance.getClass().getMethods()) { - if (method.isAnnotationPresent(annotation)) { - final boolean isAccessible = method.isAccessible(); - method.setAccessible(true); - - try { - final Class<?>[] argumentTypes = method.getParameterTypes(); - if (argumentTypes.length > args.length) { - throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given", - method.getName(), instance, argumentTypes.length, args.length)); - } + invokeMethodsWithAnnotation(annotation, null, instance, args); + } + - for (int i = 0; i < argumentTypes.length; i++) { - final Class<?> argType = argumentTypes[i]; - if (!argType.isAssignableFrom(args[i].getClass())) { - throw new IllegalArgumentException(String.format( - "Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s", - method.getName(), instance, i, argType, args[i].getClass())); + /** + * Invokes all methods on the given instance that have been annotated with + * the given preferredAnnotation and if no such method exists will invoke all + * methods on the given instance that have been annotated with the given + * alternateAnnotation, if any exists. If the signature of the method that is defined in + * <code>instance</code> uses 1 or more parameters, those parameters must be + * specified by the <code>args</code> parameter. However, if more arguments + * are supplied by the <code>args</code> parameter than needed, the extra + * arguments will be ignored. + * + * @param preferredAnnotation + * @param alternateAnnotation + * @param instance + * @param args + * @throws InvocationTargetException + * @throws IllegalArgumentException + * @throws IllegalAccessException + */ + public static void invokeMethodsWithAnnotation(final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, final Object instance, final Object... args) throws IllegalAccessException, IllegalArgumentException, InvocationTargetException { + final List<Class<? extends Annotation>> annotationClasses = new ArrayList<>(alternateAnnotation == null ? 1 : 2); + annotationClasses.add(preferredAnnotation); + if ( alternateAnnotation != null ) { + annotationClasses.add(alternateAnnotation); + } + + boolean annotationFound = false; + for ( final Class<? extends Annotation> annotationClass : annotationClasses ) { + if ( annotationFound ) { + break; + } + + try { + for (final Method method : instance.getClass().getMethods()) { + if (method.isAnnotationPresent(annotationClass)) { + annotationFound = true; + final boolean isAccessible = method.isAccessible(); + method.setAccessible(true); + + try { + final Class<?>[] argumentTypes = method.getParameterTypes(); + if (argumentTypes.length > args.length) { + throw new IllegalArgumentException(String.format("Unable to invoke method %1$s on %2$s because method expects %3$s parameters but only %4$s were given", + method.getName(), instance, argumentTypes.length, args.length)); } - } - - if (argumentTypes.length == args.length) { - method.invoke(instance, args); - } else { - final Object[] argsToPass = new Object[argumentTypes.length]; - for (int i = 0; i < argsToPass.length; i++) { - argsToPass[i] = args[i]; + + for (int i = 0; i < argumentTypes.length; i++) { + final Class<?> argType = argumentTypes[i]; + if (!argType.isAssignableFrom(args[i].getClass())) { + throw new IllegalArgumentException(String.format( + "Unable to invoke method %1$s on %2$s because method parameter %3$s is expected to be of type %4$s but argument passed was of type %5$s", + method.getName(), instance, i, argType, args[i].getClass())); + } + } + + if (argumentTypes.length == args.length) { + method.invoke(instance, args); + } else { + final Object[] argsToPass = new Object[argumentTypes.length]; + for (int i = 0; i < argsToPass.length; i++) { + argsToPass[i] = args[i]; + } + + method.invoke(instance, argsToPass); + } + } finally { + if (!isAccessible) { + method.setAccessible(false); } - - method.invoke(instance, argsToPass); - } - } finally { - if (!isAccessible) { - method.setAccessible(false); } } } - } - } catch (final InvocationTargetException ite) { - if ( ite.getCause() instanceof RuntimeException ) { - throw (RuntimeException) ite.getCause(); - } else { - throw ite; + } catch (final InvocationTargetException ite) { + if ( ite.getCause() instanceof RuntimeException ) { + throw (RuntimeException) ite.getCause(); + } else { + throw ite; + } } } } + /** * Invokes all methods on the given instance that have been annotated with * the given Annotation. If the signature of the method that is defined in @@ -107,47 +148,86 @@ public class ReflectionUtils { * is returned, an error will have been logged. */ public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) { - for (final Method method : instance.getClass().getMethods()) { - if (method.isAnnotationPresent(annotation)) { - final boolean isAccessible = method.isAccessible(); - method.setAccessible(true); - - try { - final Class<?>[] argumentTypes = method.getParameterTypes(); - if (argumentTypes.length > args.length) { - LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given", - new Object[]{method.getName(), instance, argumentTypes.length, args.length}); - return false; - } - - for (int i = 0; i < argumentTypes.length; i++) { - final Class<?> argType = argumentTypes[i]; - if (!argType.isAssignableFrom(args[i].getClass())) { - LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}", - new Object[]{method.getName(), instance, i, argType, args[i].getClass()}); + return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, args); + } + + + /** + * Invokes all methods on the given instance that have been annotated with + * the given preferredAnnotation and if no such method exists will invoke all methods + * on the given instance that have been annotated with the given + * alternateAnnotation, if any exists. If the signature of the method that is defined in + * <code>instance</code> uses 1 or more parameters, those parameters must be + * specified by the <code>args</code> parameter. However, if more arguments + * are supplied by the <code>args</code> parameter than needed, the extra + * arguments will be ignored. + * + * @param preferredAnnotation + * @param alternateAnnotation + * @param instance + * @param args + * @return <code>true</code> if all appropriate methods were invoked and + * returned without throwing an Exception, <code>false</code> if one of the + * methods threw an Exception or could not be invoked; if <code>false</code> + * is returned, an error will have been logged. + */ + public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, final Object instance, final Object... args) { + final List<Class<? extends Annotation>> annotationClasses = new ArrayList<>(alternateAnnotation == null ? 1 : 2); + annotationClasses.add(preferredAnnotation); + if ( alternateAnnotation != null ) { + annotationClasses.add(alternateAnnotation); + } + + boolean annotationFound = false; + for ( final Class<? extends Annotation> annotationClass : annotationClasses ) { + if ( annotationFound ) { + break; + } + + for (final Method method : instance.getClass().getMethods()) { + if (method.isAnnotationPresent(annotationClass)) { + annotationFound = true; + + final boolean isAccessible = method.isAccessible(); + method.setAccessible(true); + + try { + final Class<?>[] argumentTypes = method.getParameterTypes(); + if (argumentTypes.length > args.length) { + LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given", + new Object[]{method.getName(), instance, argumentTypes.length, args.length}); return false; } - } - - try { - if (argumentTypes.length == args.length) { - method.invoke(instance, args); - } else { - final Object[] argsToPass = new Object[argumentTypes.length]; - for (int i = 0; i < argsToPass.length; i++) { - argsToPass[i] = args[i]; + + for (int i = 0; i < argumentTypes.length; i++) { + final Class<?> argType = argumentTypes[i]; + if (!argType.isAssignableFrom(args[i].getClass())) { + LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}", + new Object[]{method.getName(), instance, i, argType, args[i].getClass()}); + return false; } - - method.invoke(instance, argsToPass); } - } catch (final IllegalAccessException | IllegalArgumentException | InvocationTargetException t) { - LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t}); - LOG.error("", t); - return false; - } - } finally { - if (!isAccessible) { - method.setAccessible(false); + + try { + if (argumentTypes.length == args.length) { + method.invoke(instance, args); + } else { + final Object[] argsToPass = new Object[argumentTypes.length]; + for (int i = 0; i < argsToPass.length; i++) { + argsToPass[i] = args[i]; + } + + method.invoke(instance, argsToPass); + } + } catch (final IllegalAccessException | IllegalArgumentException | InvocationTargetException t) { + LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t}); + LOG.error("", t); + return false; + } + } finally { + if (!isAccessible) { + method.setAccessible(false); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/1c0eb6c6/nifi/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/test/processors/StubAttributeLoggerProcessor.java ---------------------------------------------------------------------- diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/test/processors/StubAttributeLoggerProcessor.java b/nifi/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/test/processors/StubAttributeLoggerProcessor.java index 73d38e8..d49db29 100644 --- a/nifi/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/test/processors/StubAttributeLoggerProcessor.java +++ b/nifi/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/test/processors/StubAttributeLoggerProcessor.java @@ -22,13 +22,13 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import org.apache.nifi.annotation.behavior.SideEffectFree; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.annotation.SideEffectFree; @SideEffectFree public class StubAttributeLoggerProcessor extends AbstractProcessor {
