http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java index 1901fb6..a600699 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java @@ -28,123 +28,149 @@ import org.apache.nifi.controller.ControllerServiceLookup; public interface ControllerServiceProvider extends ControllerServiceLookup { /** - * Creates a new Controller Service of the specified type and assigns it the given id. If <code>firstTimeadded</code> - * is true, calls any methods that are annotated with {@link OnAdded} + * Creates a new Controller Service of the specified type and assigns it the + * given id. If <code>firstTimeadded</code> is true, calls any methods that + * are annotated with {@link OnAdded} * - * @param type - * @param id - * @param firstTimeAdded - * @return + * @param type of service + * @param id of service + * @param firstTimeAdded for service + * @return the service node */ ControllerServiceNode createControllerService(String type, String id, boolean firstTimeAdded); /** - * Gets the controller service node for the specified identifier. Returns + * @param id of the service + * @return the controller service node for the specified identifier. Returns * <code>null</code> if the identifier does not match a known service - * - * @param id - * @return */ ControllerServiceNode getControllerServiceNode(String id); - + /** - * Removes the given Controller Service from the flow. This will call all appropriate methods - * that have the @OnRemoved annotation. - * + * Removes the given Controller Service from the flow. This will call all + * appropriate methods that have the @OnRemoved annotation. + * * @param serviceNode the controller service to remove - * - * @throws IllegalStateException if the controller service is not disabled or is not a part of this flow + * + * @throws IllegalStateException if the controller service is not disabled + * or is not a part of this flow */ void removeControllerService(ControllerServiceNode serviceNode); - + /** - * Enables the given controller service that it can be used by other components - * @param serviceNode + * Enables the given controller service that it can be used by other + * components + * + * @param serviceNode the service node */ void enableControllerService(ControllerServiceNode serviceNode); - + /** - * Enables the collection of services. If a service in this collection depends on another service, - * the service being depended on must either already be enabled or must be in the collection as well. - * @param serviceNodes + * Enables the collection of services. If a service in this collection + * depends on another service, the service being depended on must either + * already be enabled or must be in the collection as well. + * + * @param serviceNodes the nodes */ void enableControllerServices(Collection<ControllerServiceNode> serviceNodes); - + /** - * Disables the given controller service so that it cannot be used by other components. This allows - * configuration to be updated or allows service to be removed. - * @param serviceNode + * Disables the given controller service so that it cannot be used by other + * components. This allows configuration to be updated or allows service to + * be removed. + * + * @param serviceNode the node */ void disableControllerService(ControllerServiceNode serviceNode); - + /** - * Returns a Set of all Controller Services that exist for this service provider. - * @return + * @return a Set of all Controller Services that exist for this service + * provider */ Set<ControllerServiceNode> getAllControllerServices(); - + /** - * Verifies that all running Processors and Reporting Tasks referencing the Controller Service (or a service - * that depends on the provided service) can be stopped. - * @param serviceNode - * - * @throws IllegalStateException if any referencing component cannot be stopped + * Verifies that all running Processors and Reporting Tasks referencing the + * Controller Service (or a service that depends on the provided service) + * can be stopped. + * + * @param serviceNode the node + * + * @throws IllegalStateException if any referencing component cannot be + * stopped */ void verifyCanStopReferencingComponents(ControllerServiceNode serviceNode); - + /** - * Recursively unschedules all schedulable components (Processors and Reporting Tasks) that reference the given - * Controller Service. For any Controller services that reference this one, its schedulable referencing components will also - * be unscheduled. - * @param serviceNode + * Recursively unschedules all schedulable components (Processors and + * Reporting Tasks) that reference the given Controller Service. For any + * Controller services that reference this one, its schedulable referencing + * components will also be unscheduled. + * + * @param serviceNode the node */ void unscheduleReferencingComponents(ControllerServiceNode serviceNode); - + /** - * Verifies that all Controller Services referencing the provided Controller Service can be disabled. - * @param serviceNode - * - * @throws IllegalStateException if any referencing service cannot be disabled + * Verifies that all Controller Services referencing the provided Controller + * Service can be disabled. + * + * @param serviceNode the node + * + * @throws IllegalStateException if any referencing service cannot be + * disabled */ void verifyCanDisableReferencingServices(ControllerServiceNode serviceNode); - + /** - * Disables any Controller Service that references the provided Controller Service. This action is performed recursively - * so that if service A references B and B references C, disabling references for C will first disable A, then B. - * @param serviceNode + * Disables any Controller Service that references the provided Controller + * Service. This action is performed recursively so that if service A + * references B and B references C, disabling references for C will first + * disable A, then B. + * + * @param serviceNode the node */ void disableReferencingServices(ControllerServiceNode serviceNode); - + /** - * Verifies that all Controller Services referencing the provided ControllerService can be enabled. - * @param serviceNode - * - * @throws IllegalStateException if any referencing component cannot be enabled + * Verifies that all Controller Services referencing the provided + * ControllerService can be enabled. + * + * @param serviceNode the node + * + * @throws IllegalStateException if any referencing component cannot be + * enabled */ void verifyCanEnableReferencingServices(ControllerServiceNode serviceNode); - - + /** - * Enables all Controller Services that are referencing the given service. If Service A references Service B and Service - * B references serviceNode, Service A and B will both be enabled. - * @param serviceNode + * Enables all Controller Services that are referencing the given service. + * If Service A references Service B and Service B references serviceNode, + * Service A and B will both be enabled. + * + * @param serviceNode the node */ void enableReferencingServices(ControllerServiceNode serviceNode); - + /** - * Verifies that all enabled Processors referencing the ControllerService (or a service that depends on - * the provided service) can be scheduled to run. - * @param serviceNode - * - * @throws IllegalStateException if any referencing component cannot be scheduled + * Verifies that all enabled Processors referencing the ControllerService + * (or a service that depends on the provided service) can be scheduled to + * run. + * + * @param serviceNode the node + * + * @throws IllegalStateException if any referencing component cannot be + * scheduled */ void verifyCanScheduleReferencingComponents(ControllerServiceNode serviceNode); - + /** - * Schedules any schedulable component (Processor, ReportingTask) that is referencing the given Controller Service - * to run. This is performed recursively, so if a Processor is referencing Service A, which is referencing serviceNode, - * then the Processor will also be started. - * @param serviceNode + * Schedules any schedulable component (Processor, ReportingTask) that is + * referencing the given Controller Service to run. This is performed + * recursively, so if a Processor is referencing Service A, which is + * referencing serviceNode, then the Processor will also be started. + * + * @param serviceNode the node */ void scheduleReferencingComponents(ControllerServiceNode serviceNode); }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java index 67ffb6c..df18c62 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java @@ -26,26 +26,21 @@ import org.apache.nifi.controller.ConfiguredComponent; public interface ControllerServiceReference { /** - * Returns the component that is being referenced - * - * @return + * @return the component that is being referenced */ ControllerServiceNode getReferencedComponent(); /** - * Returns a {@link Set} of all components that are referencing this + * @return a {@link Set} of all components that are referencing this * Controller Service - * - * @return */ Set<ConfiguredComponent> getReferencingComponents(); /** - * Returns a {@link Set} of all Processors, Reporting Tasks, and Controller Services that are - * referencing the Controller Service and are running (in the case of Processors and Reporting Tasks) - * or enabled (in the case of Controller Services) - * - * @return + * @return a {@link Set} of all Processors, Reporting Tasks, and Controller + * Services that are referencing the Controller Service and are running (in + * the case of Processors and Reporting Tasks) or enabled (in the case of + * Controller Services) */ Set<ConfiguredComponent> getActiveReferences(); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java index 2ed8fd9..63840e7 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java @@ -16,30 +16,28 @@ */ package org.apache.nifi.controller.service; - /** * Represents the valid states for a Controller Service. */ public enum ControllerServiceState { + /** * Controller Service is disabled and cannot be used. */ DISABLED, - /** - * Controller Service has been disabled but has not yet finished its lifecycle - * methods. + * Controller Service has been disabled but has not yet finished its + * lifecycle methods. */ DISABLING, - /** - * Controller Service has been enabled but has not yet finished its lifecycle methods. + * Controller Service has been enabled but has not yet finished its + * lifecycle methods. */ ENABLING, - /** - * Controller Service has been enabled and has finished its lifecycle methods. The Controller SErvice - * is ready to be used. + * Controller Service has been enabled and has finished its lifecycle + * methods. The Controller SErvice is ready to be used. */ ENABLED; } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java index b898638..a9cfb58 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java @@ -51,7 +51,7 @@ public interface ProcessGroup { /** * Updates the ProcessGroup to point to a new parent * - * @param group + * @param group new parent group */ void setParent(ProcessGroup group); @@ -68,19 +68,18 @@ public interface ProcessGroup { /** * Updates the name of this ProcessGroup. * - * @param name + * @param name new name */ void setName(String name); /** * Updates the position of where this ProcessGroup is located in the graph + * @param position new position */ void setPosition(Position position); /** - * Returns the position of where this ProcessGroup is located in the graph - * - * @return + * @return the position of where this ProcessGroup is located in the graph */ Position getPosition(); @@ -93,14 +92,12 @@ public interface ProcessGroup { /** * Updates the comments for this ProcessGroup * - * @param comments + * @param comments new comments */ void setComments(String comments); /** - * Returns the counts for this ProcessGroup - * - * @return + * @return the counts for this ProcessGroup */ ProcessGroupCounts getCounts(); @@ -129,18 +126,17 @@ public interface ProcessGroup { /** * Enables the given Input Port * - * @param port + * @param port to enable */ void enableInputPort(Port port); /** * Enables the given Output Port * - * @param port + * @param port to enable */ void enableOutputPort(Port port); - /** * Starts the given Processor * @@ -153,46 +149,45 @@ public interface ProcessGroup { /** * Starts the given Input Port * - * @param port + * @param port to start */ void startInputPort(Port port); /** * Starts the given Output Port * - * @param port + * @param port to start */ void startOutputPort(Port port); /** * Starts the given Funnel * - * @param funnel + * @param funnel to start */ void startFunnel(Funnel funnel); /** * Stops the given Processor * - * @param processor + * @param processor to stop */ void stopProcessor(ProcessorNode processor); /** * Stops the given Port * - * @param processor + * @param port to stop */ void stopInputPort(Port port); /** * Stops the given Port * - * @param processor + * @param port to stop */ void stopOutputPort(Port port); - /** * Disables the given Processor * @@ -205,18 +200,17 @@ public interface ProcessGroup { /** * Disables the given Input Port * - * @param port + * @param port to disable */ void disableInputPort(Port port); /** * Disables the given Output Port * - * @param port + * @param port to disable */ void disableOutputPort(Port port); - /** * Indicates that the Flow is being shutdown; allows cleanup of resources * associated with processors, etc. @@ -224,10 +218,8 @@ public interface ProcessGroup { void shutdown(); /** - * Returns a boolean indicating whether or not this ProcessGroup is the root + * @return a boolean indicating whether or not this ProcessGroup is the root * group - * - * @return */ boolean isRootGroup(); @@ -236,7 +228,7 @@ public interface ProcessGroup { * external sources to {@link Processor}s and other {@link Port}s within * this ProcessGroup. * - * @param port + * @param port to add */ void addInputPort(Port port); @@ -297,7 +289,7 @@ public interface ProcessGroup { /** * Adds a reference to a ProgressGroup as a child of this. * - * @return the newly created reference + * @param group to add */ void addProcessGroup(ProcessGroup group); @@ -305,8 +297,8 @@ public interface ProcessGroup { * Returns the ProcessGroup whose parent is <code>this</code> and whose id * is given * - * @param id - * @return + * @param id identifier of group to get + * @return child group */ ProcessGroup getProcessGroup(String id); @@ -363,12 +355,10 @@ public interface ProcessGroup { ProcessorNode getProcessor(String id); /** - * Returns the <code>Connectable</code> with the given ID, or + * @param id the ID of the Connectable + * @return the <code>Connectable</code> with the given ID, or * <code>null</code> if the <code>Connectable</code> is not a member of the * group - * - * @param id the ID of the Connectable - * @return */ Connectable getConnectable(String id); @@ -377,7 +367,7 @@ public interface ProcessGroup { * the Source and Destination of the Connection that the Connection has been * established. * - * @param connection + * @param connection to add * @throws NullPointerException if the connection is null * @throws IllegalStateException if the source or destination of the * connection is not a member of this ProcessGroup or if a connection @@ -388,7 +378,7 @@ public interface ProcessGroup { /** * Removes the connection from this ProcessGroup. * - * @param connection + * @param connection to remove * @throws IllegalStateException if <code>connection</code> is not contained * within this. */ @@ -404,35 +394,32 @@ public interface ProcessGroup { * this method does not notify either, as both the Source and Destination * should already be aware of the Connection. * - * @param connection + * @param connection to inherit */ void inheritConnection(Connection connection); /** + * @param id identifier of connection * @return the Connection with the given ID, or <code>null</code> if the * connection does not exist. */ Connection getConnection(String id); /** - * Returns the {@link Set} of all {@link Connection}s contained within this. - * - * @return + * @return the {@link Set} of all {@link Connection}s contained within this */ Set<Connection> getConnections(); /** - * Returns a List of all Connections contains within this ProcessGroup and - * any child ProcessGroups. - * - * @return + * @return a List of all Connections contains within this ProcessGroup and + * any child ProcessGroups */ List<Connection> findAllConnections(); /** * Adds the given RemoteProcessGroup to this ProcessGroup * - * @param remoteGroup + * @param remoteGroup group to add * * @throws NullPointerException if the given argument is null */ @@ -441,7 +428,7 @@ public interface ProcessGroup { /** * Removes the given RemoteProcessGroup from this ProcessGroup * - * @param remoteGroup + * @param remoteGroup group to remove * @throws NullPointerException if the argument is null * @throws IllegalStateException if the given argument does not belong to * this ProcessGroup @@ -449,21 +436,17 @@ public interface ProcessGroup { void removeRemoteProcessGroup(RemoteProcessGroup remoteGroup); /** - * Returns the RemoteProcessGroup that is the child of this ProcessGroup and + * @param id identifier of group to find + * @return the RemoteProcessGroup that is the child of this ProcessGroup and * has the given ID. If no RemoteProcessGroup can be found with the given - * ID, returns <code>null</code>. - * - * @param id - * @return + * ID, returns <code>null</code> */ RemoteProcessGroup getRemoteProcessGroup(String id); /** - * Returns a set of all RemoteProcessGroups that belong to this + * @return a set of all RemoteProcessGroups that belong to this * ProcessGroup. If no RemoteProcessGroup's have been added to this - * ProcessGroup, will return an empty Set. - * - * @return + * ProcessGroup, will return an empty Set */ Set<RemoteProcessGroup> getRemoteProcessGroups(); @@ -471,7 +454,6 @@ public interface ProcessGroup { * Adds the given Label to this ProcessGroup * * @param label the label to add - * @return * * @throws NullPointerException if the argument is null */ @@ -488,155 +470,129 @@ public interface ProcessGroup { void removeLabel(Label label); /** - * Returns a set of all Labels that belong to this ProcessGroup. If no - * Labels belong to this ProcessGroup, returns an empty Set. - * - * @return + * @return a set of all Labels that belong to this ProcessGroup. If no + * Labels belong to this ProcessGroup, returns an empty Set */ Set<Label> getLabels(); /** - * Returns the Label that belongs to this ProcessGroup and has the given id. - * If no Label can be found with this ID, returns <code>null</code>. - * - * @param id - * @return + * @param id of the label + * @return the Label that belongs to this ProcessGroup and has the given id. + * If no Label can be found with this ID, returns <code>null</code> */ Label getLabel(String id); /** - * Returns the Process Group with the given ID, if it exists as a child of + * @param id of the group + * @return the Process Group with the given ID, if it exists as a child of * this ProcessGroup, or is this ProcessGroup. This performs a recursive * search of all ProcessGroups and descendant ProcessGroups - * - * @param id - * @return */ ProcessGroup findProcessGroup(String id); /** - * Returns the RemoteProcessGroup with the given ID, if it exists as a child + * @param id of the group + * @return the RemoteProcessGroup with the given ID, if it exists as a child * or descendant of this ProcessGroup. This performs a recursive search of * all ProcessGroups and descendant ProcessGroups - * - * @param id - * @return */ RemoteProcessGroup findRemoteProcessGroup(String id); /** - * Returns a List of all Remote Process Groups that are children or + * @return a List of all Remote Process Groups that are children or * descendants of this ProcessGroup. This performs a recursive search of all * descendant ProcessGroups - * - * @return */ List<RemoteProcessGroup> findAllRemoteProcessGroups(); /** - * Returns the Processor with the given ID, if it exists as a child or + * @param id of the processor node + * @return the Processor with the given ID, if it exists as a child or * descendant of this ProcessGroup. This performs a recursive search of all * descendant ProcessGroups - * - * @param id - * @return */ ProcessorNode findProcessor(String id); /** - * Returns a List of all Processors that are children or descendants of this + * @return a List of all Processors that are children or descendants of this * ProcessGroup. This performs a recursive search of all descendant * ProcessGroups - * - * @return */ List<ProcessorNode> findAllProcessors(); /** - * Returns a List of all Labels that are children or descendants of this + * @return a List of all Labels that are children or descendants of this * ProcessGroup. This performsn a recursive search of all descendant * ProcessGroups - * - * @return */ List<Label> findAllLabels(); /** - * Returns the input port with the given ID, if it exists; otherwise returns + * @param id of the port + * @return the input port with the given ID, if it exists; otherwise returns * null. This performs a recursive search of all Input Ports and descendant * ProcessGroups - * - * @param id - * @return */ Port findInputPort(String id); /** - * Returns the input port with the given name, if it exists; otherwise - * returns null. ProcessGroups - * - * @param name - * @return + * @param name of port + * @return the input port with the given name, if it exists; otherwise + * returns null */ Port getInputPortByName(String name); /** - * Returns the output port with the given ID, if it exists; otherwise + * @param id of the port + * @return the output port with the given ID, if it exists; otherwise * returns null. This performs a recursive search of all Output Ports and * descendant ProcessGroups - * - * @param id - * @return */ Port findOutputPort(String id); /** - * Returns the output port with the given name, if it exists; otherwise - * returns null. - * - * @param name - * @return + * @param name of the port + * @return the output port with the given name, if it exists; otherwise + * returns null */ Port getOutputPortByName(String name); /** - * Adds the given funnel to this ProcessGroup and starts it. While other components - * do not automatically start, the funnel does by default because it is intended to be - * more of a notional component that users are unable to explicitly start and stop. - * However, there is an override available in {@link #addFunnel(Funnel, boolean)} because - * we may need to avoid starting the funnel on restart until the flow is completely - * initialized. + * Adds the given funnel to this ProcessGroup and starts it. While other + * components do not automatically start, the funnel does by default because + * it is intended to be more of a notional component that users are unable + * to explicitly start and stop. However, there is an override available in + * {@link #addFunnel(Funnel, boolean)} because we may need to avoid starting + * the funnel on restart until the flow is completely initialized. * - * @param funnel + * @param funnel to add */ void addFunnel(Funnel funnel); - + /** - * Adds the given funnel to this ProcessGroup and optionally starts the funnel. - * @param funnel - * @param autoStart + * Adds the given funnel to this ProcessGroup and optionally starts the + * funnel. + * + * @param funnel to add + * @param autoStart true if should auto start */ void addFunnel(Funnel funnel, boolean autoStart); /** - * Returns a Set of all Funnels that belong to this ProcessGroup - * - * @return + * @return a Set of all Funnels that belong to this ProcessGroup */ Set<Funnel> getFunnels(); /** - * Returns the funnel with the given identifier - * - * @param id - * @return + * @param id of the funnel + * @return the funnel with the given identifier */ Funnel getFunnel(String id); /** * Removes the given funnel from this ProcessGroup * - * @param funnel + * @param funnel to remove * * @throws IllegalStateException if the funnel is not a member of this * ProcessGroup or has incoming or outgoing connections @@ -654,7 +610,7 @@ public interface ProcessGroup { * Removes all of the components whose ID's are specified within the given * {@link Snippet} from this ProcessGroup. * - * @param snippet + * @param snippet to remove * * @throws NullPointerException if argument is null * @throws IllegalStateException if any ID in the snippet refers to a @@ -663,12 +619,10 @@ public interface ProcessGroup { void remove(final Snippet snippet); /** - * Returns the Connectable with the given ID, if it exists; otherwise + * @param identifier of connectable + * @return the Connectable with the given ID, if it exists; otherwise * returns null. This performs a recursive search of all ProcessGroups' * input ports, output ports, funnels, processors, and remote process groups - * - * @param identifier - * @return */ Connectable findConnectable(String identifier); @@ -677,10 +631,9 @@ public interface ProcessGroup { * {@link Snippet} from this ProcessGroup into the given destination * ProcessGroup * - * @param snippet - * @param destination - * - * @throws NullPointerExcepiton if either argument is null + * @param snippet to move + * @param destination where to move + * @throws NullPointerException if either argument is null * @throws IllegalStateException if any ID in the snippet refers to a * component that is not within this ProcessGroup */ @@ -696,7 +649,7 @@ public interface ProcessGroup { * Ensures that deleting the given snippet is a valid operation at this * point in time, depending on the state of this ProcessGroup * - * @param snippet + * @param snippet to delete * * @throws IllegalStateException if deleting the Snippet is not valid at * this time @@ -708,8 +661,8 @@ public interface ProcessGroup { * operation at this point in time, depending on the state of both * ProcessGroups * - * @param snippet - * @param newProcessGroup + * @param snippet to move + * @param newProcessGroup new location * * @throws IllegalStateException if the move is not valid at this time */ http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java index c842195..db05313 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroup.java @@ -45,13 +45,11 @@ public interface RemoteProcessGroup { void setComments(String comments); void shutdown(); - + /** - * Returns the name of this RemoteProcessGroup. The value returned will + * @return the name of this RemoteProcessGroup. The value returned will * never be null. If unable to communicate with the remote instance, the URI * of that instance may be returned instead - * - * @return */ String getName(); @@ -78,36 +76,30 @@ public interface RemoteProcessGroup { void setYieldDuration(final String yieldDuration); String getYieldDuration(); - + /** * Sets the timeout using the TimePeriod format (e.g., "30 secs", "1 min") * - * @param timePeriod - * @throws IllegalArgumentException + * @param timePeriod new period + * @throws IllegalArgumentException iae */ void setCommunicationsTimeout(String timePeriod) throws IllegalArgumentException; /** - * Returns the communications timeout in terms of the given TimeUnit - * - * @param timeUnit - * @return + * @param timeUnit unit of time to report timeout + * @return the communications timeout in terms of the given TimeUnit */ int getCommunicationsTimeout(TimeUnit timeUnit); /** - * Returns the user-configured String representation of the communications + * @return the user-configured String representation of the communications * timeout - * - * @return */ String getCommunicationsTimeout(); /** - * Indicates whether or not the RemoteProcessGroup is currently scheduled to + * @return Indicates whether or not the RemoteProcessGroup is currently scheduled to * transmit data - * - * @return */ boolean isTransmitting(); @@ -126,7 +118,7 @@ public interface RemoteProcessGroup { * Initiates communications between this instance and the remote instance * only for the port specified. * - * @param port + * @param port port to start */ void startTransmitting(RemoteGroupPort port); @@ -134,47 +126,38 @@ public interface RemoteProcessGroup { * Immediately terminates communications between this instance and the * remote instance only for the port specified. * - * @param port + * @param port to stop */ void stopTransmitting(RemoteGroupPort port); /** - * Indicates whether or not communications with this RemoteProcessGroup will + * @return Indicates whether or not communications with this RemoteProcessGroup will * be secure (2-way authentication) - * - * @return + * @throws org.apache.nifi.controller.exception.CommunicationsException ce */ boolean isSecure() throws CommunicationsException; /** - * Indicates whether or not communications with this RemoteProcessGroup will + * @return Indicates whether or not communications with this RemoteProcessGroup will * be secure (2-way authentication). Returns null if unknown. - * - * @return */ Boolean getSecureFlag(); /** - * Returns true if the target system has site to site enabled. Returns false - * otherwise (they don't or they have not yet responded). - * - * @return + * @return true if the target system has site to site enabled. Returns false + * otherwise (they don't or they have not yet responded) */ boolean isSiteToSiteEnabled(); /** - * Returns a String indicating why we are not authorized to communicate with + * @return a String indicating why we are not authorized to communicate with * the remote instance, or <code>null</code> if we are authorized - * - * @return */ String getAuthorizationIssue(); /** - * Returns the {@link EventReporter} that can be used to report any notable + * @return the {@link EventReporter} that can be used to report any notable * events - * - * @return */ EventReporter getEventReporter(); @@ -195,11 +178,10 @@ public interface RemoteProcessGroup { * Removes a port that no longer exists on the remote instance from this * RemoteProcessGroup * - * @param port + * @param port to remove */ void removeNonExistentPort(final RemoteGroupPort port); - /** * Called whenever RemoteProcessGroup is removed from the flow, so that any * resources can be cleaned up appropriately. http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java index fb4f6e0..4d7f774 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/RemoteProcessGroupPortDescriptor.java @@ -19,73 +19,53 @@ package org.apache.nifi.groups; public interface RemoteProcessGroupPortDescriptor { /** - * The comments as configured in the target port. - * - * @return + * @return comments as configured in the target port */ String getComments(); /** - * The number tasks that may transmit flow files to the target port - * concurrently. - * - * @return + * @return The number tasks that may transmit flow files to the target port + * concurrently */ Integer getConcurrentlySchedulableTaskCount(); /** - * The id of the target port. - * - * @return + * @return id of the target port */ String getId(); /** - * The id of the remote process group that this port resides in. - * - * @return + * @return id of the remote process group that this port resides in */ String getGroupId(); /** - * The name of the target port. - * - * @return + * @return name of the target port */ String getName(); /** - * Whether or not this remote group port is configured for transmission. - * - * @return + * @return Whether or not this remote group port is configured for transmission */ Boolean isTransmitting(); /** - * Whether or not flow file are compressed when sent to this target port. - * - * @return + * @return Whether or not flow file are compressed when sent to this target port */ Boolean getUseCompression(); /** - * Whether ot not the target port exists. - * - * @return + * @return Whether or not the target port exists */ Boolean getExists(); /** - * Whether or not the target port is running. - * - * @return + * @return Whether or not the target port is running */ Boolean isTargetRunning(); /** - * Whether or not this port has either an incoming or outgoing connection. - * - * @return + * @return Whether or not this port has either an incoming or outgoing connection */ Boolean isConnected(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java index 4a017ce..92091da 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepository.java @@ -30,33 +30,33 @@ public interface LogRepository { * Registers an observer so that it will be notified of all Log Messages * whose levels are at least equal to the given level. * - * @param observerIdentifier - * @param level - * @param observer + * @param observerIdentifier identifier of observer + * @param level of logs the observer wants + * @param observer the observer */ void addObserver(String observerIdentifier, LogLevel level, LogObserver observer); /** * Sets the observation level of the specified observer. * - * @param observerIdentifier - * @param level + * @param observerIdentifier identifier of observer + * @param level of logs the observer wants */ void setObservationLevel(String observerIdentifier, LogLevel level); /** * Gets the observation level for the specified observer. * - * @param observerIdentifier - * @return + * @param observerIdentifier identifier of observer + * @return level */ LogLevel getObservationLevel(String observerIdentifier); /** * Removes the given LogObserver from this Repository. * - * @param observerIdentifier - * @return + * @param observerIdentifier identifier of observer + * @return old log observer */ LogObserver removeObserver(String observerIdentifier); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java index aa905a8..9471ba6 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/nar/NarThreadContextClassLoader.java @@ -40,8 +40,7 @@ import org.apache.nifi.provenance.ProvenanceEventRepository; import org.apache.nifi.reporting.ReportingTask; /** - * - * @author none THREAD SAFE + * THREAD SAFE */ public class NarThreadContextClassLoader extends URLClassLoader { @@ -125,7 +124,7 @@ public class NarThreadContextClassLoader extends URLClassLoader { // contains the class or resource that we are looking for. // This locks the current Thread into the appropriate NAR ClassLoader Context. The framework will change // the ContextClassLoader back to the NarThreadContextClassLoader as appropriate via the - // {@link FlowEngine.beforeExecute(Thread, Runnable)} and + // {@link FlowEngine.beforeExecute(Thread, Runnable)} and // {@link FlowEngine.afterExecute(Thread, Runnable)} methods. if (desiredClassLoader instanceof NarClassLoader) { Thread.currentThread().setContextClassLoader(desiredClassLoader); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java index f08277c..8cad103 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RemoteGroupPort.java @@ -25,11 +25,11 @@ import org.apache.nifi.groups.RemoteProcessGroup; public abstract class RemoteGroupPort extends AbstractPort implements Port, RemoteDestination { - public RemoteGroupPort(String id, String name, ProcessGroup processGroup, ConnectableType type, ProcessScheduler scheduler) { - super(id, name, processGroup, type, scheduler); - } + public RemoteGroupPort(String id, String name, ProcessGroup processGroup, ConnectableType type, ProcessScheduler scheduler) { + super(id, name, processGroup, type, scheduler); + } - public abstract RemoteProcessGroup getRemoteProcessGroup(); + public abstract RemoteProcessGroup getRemoteProcessGroup(); public abstract TransferDirection getTransferDirection(); http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java index 4afdfb7..b60e789 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/RootGroupPort.java @@ -42,36 +42,36 @@ public interface RootGroupPort extends Port { * and returns a {@link PortAuthorizationResult} indicating why the user is * unauthorized if this assumption fails * - * @param dn - * @return + * @param dn dn of user + * @return result */ PortAuthorizationResult checkUserAuthorization(String dn); /** * Receives data from the given stream * - * @param peer - * @param serverProtocol - * @param requestHeaders + * @param peer peer + * @param serverProtocol protocol + * @param requestHeaders headers * * @return the number of FlowFiles received - * @throws org.apache.nifi.remote.exception.NotAuthorizedException - * @throws org.apache.nifi.remote.exception.BadRequestException - * @throws org.apache.nifi.remote.exception.RequestExpiredException + * @throws org.apache.nifi.remote.exception.NotAuthorizedException nae + * @throws org.apache.nifi.remote.exception.BadRequestException bre + * @throws org.apache.nifi.remote.exception.RequestExpiredException ree */ int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException; /** * Transfers data to the given stream * - * @param peer - * @param requestHeaders - * @param serverProtocol + * @param peer peer + * @param requestHeaders headers + * @param serverProtocol protocol * * @return the number of FlowFiles transferred - * @throws org.apache.nifi.remote.exception.NotAuthorizedException - * @throws org.apache.nifi.remote.exception.BadRequestException - * @throws org.apache.nifi.remote.exception.RequestExpiredException + * @throws org.apache.nifi.remote.exception.NotAuthorizedException nae + * @throws org.apache.nifi.remote.exception.BadRequestException bre + * @throws org.apache.nifi.remote.exception.RequestExpiredException ree */ int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java index 0118534..6fbb88c 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/remote/protocol/ServerProtocol.java @@ -26,7 +26,6 @@ import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.remote.VersionedRemoteResource; import org.apache.nifi.remote.cluster.NodeInformant; import org.apache.nifi.remote.codec.FlowFileCodec; -import org.apache.nifi.remote.exception.BadRequestException; import org.apache.nifi.remote.exception.HandshakeException; import org.apache.nifi.remote.exception.ProtocolException; @@ -34,7 +33,7 @@ public interface ServerProtocol extends VersionedRemoteResource { /** * - * @param rootGroup + * @param rootGroup group */ void setRootProcessGroup(ProcessGroup rootGroup); @@ -45,25 +44,23 @@ public interface ServerProtocol extends VersionedRemoteResource { * NodeInformant is supported. Otherwise, throws * UnsupportedOperationException * - * @param nodeInformant + * @param nodeInformant informant */ void setNodeInformant(NodeInformant nodeInformant); /** * Receives the handshake from the Peer * - * @param peer - * @throws IOException - * @throws HandshakeException + * @param peer peer + * @throws IOException ioe + * @throws HandshakeException he */ void handshake(Peer peer) throws IOException, HandshakeException; /** - * Returns <code>true</code> if the handshaking process was completed + * @return <code>true</code> if the handshaking process was completed * successfully, <code>false</code> if either the handshaking process has * not happened or the handshake failed - * - * @return */ boolean isHandshakeSuccessful(); @@ -71,61 +68,61 @@ public interface ServerProtocol extends VersionedRemoteResource { * Negotiates the FlowFileCodec that is to be used for transferring * FlowFiles * - * @param peer - * @return - * @throws IOException - * @throws BadRequestException + * @param peer peer + * @return the codec to use + * @throws IOException ioe + * @throws org.apache.nifi.remote.exception.ProtocolException pe */ FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException; /** - * Returns the codec that has already been negotiated by this Protocol, if - * any. - * - * @return + * @return the codec that has already been negotiated by this Protocol, if + * any */ FlowFileCodec getPreNegotiatedCodec(); /** * Reads the Request Type of the next request from the Peer * + * @param peer peer * @return the RequestType that the peer would like to happen - or null, if * no data available + * @throws java.io.IOException ioe */ RequestType getRequestType(Peer peer) throws IOException; /** * Sends FlowFiles to the specified peer * - * @param peer - * @param context - * @param session - * @param codec + * @param peer peer + * @param context context + * @param session session + * @param codec codec * * @return the number of FlowFiles transferred + * @throws java.io.IOException ioe + * @throws org.apache.nifi.remote.exception.ProtocolException pe */ int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; /** * Receives FlowFiles from the specified peer * - * @param peer - * @param context - * @param session - * @param codec - * @throws IOException + * @param peer peer + * @param context context + * @param session session + * @param codec codec + * @throws IOException ioe * * @return the number of FlowFiles received - * @throws ProtocolException + * @throws ProtocolException pe */ int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException; /** - * Returns the number of milliseconds after a request is received for which + * @return the number of milliseconds after a request is received for which * the request is still valid. A valid of 0 indicates that the request will - * not expire. - * - * @return + * not expire */ long getRequestExpiration(); @@ -133,7 +130,8 @@ public interface ServerProtocol extends VersionedRemoteResource { * Sends a list of all nodes in the cluster to the specified peer. If not in * a cluster, sends info about itself * - * @param peer + * @param peer peer + * @throws java.io.IOException ioe */ void sendPeerList(Peer peer) throws IOException; http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java index 1d723b5..b2feab5 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java @@ -71,18 +71,22 @@ public final class StandardConnection implements Connection { hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode(); } + @Override public ProcessGroup getProcessGroup() { return processGroup.get(); } + @Override public String getIdentifier() { return id; } + @Override public String getName() { return name.get(); } + @Override public void setName(final String name) { this.name.set(name); } @@ -97,10 +101,12 @@ public final class StandardConnection implements Connection { this.bendPoints.set(Collections.unmodifiableList(new ArrayList<>(position))); } + @Override public int getLabelIndex() { return labelIndex.get(); } + @Override public void setLabelIndex(final int labelIndex) { this.labelIndex.set(labelIndex); } @@ -115,22 +121,27 @@ public final class StandardConnection implements Connection { this.zIndex.set(zIndex); } + @Override public Connectable getSource() { return source; } + @Override public Connectable getDestination() { return destination.get(); } + @Override public Collection<Relationship> getRelationships() { return relationships.get(); } + @Override public FlowFileQueue getFlowFileQueue() { return flowFileQueue; } + @Override public void setProcessGroup(final ProcessGroup newGroup) { final ProcessGroup currentGroup = this.processGroup.get(); try { @@ -141,6 +152,7 @@ public final class StandardConnection implements Connection { } } + @Override public void setRelationships(final Collection<Relationship> newRelationships) { final Collection<Relationship> currentRelationships = relationships.get(); if (currentRelationships.equals(newRelationships)) { @@ -160,6 +172,7 @@ public final class StandardConnection implements Connection { } } + @Override public void setDestination(final Connectable newDestination) { final Connectable previousDestination = destination.get(); if (previousDestination.equals(newDestination)) { @@ -223,7 +236,7 @@ public final class StandardConnection implements Connection { * consumers. This allows us to ensure that the Connection is not deleted * during the middle of a Session commit. * - * @param flowFile + * @param flowFile to add */ @Override public void enqueue(final FlowFileRecord flowFile) { http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java index e1d80b0..3041ada 100644 --- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java +++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FileSystemSwapManager.java @@ -83,7 +83,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { public static final int MINIMUM_SWAP_COUNT = 10000; private static final Pattern SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap"); private static final Pattern TEMP_SWAP_FILE_PATTERN = Pattern.compile("\\d+-.+\\.swap\\.part"); - + public static final int SWAP_ENCODING_VERSION = 6; public static final String EVENT_CATEGORY = "Swap FlowFiles"; @@ -99,14 +99,14 @@ public class FileSystemSwapManager implements FlowFileSwapManager { private final long swapOutMillis; private final int swapOutThreadCount; - private ContentClaimManager claimManager; // effectively final + private ContentClaimManager claimManager; // effectively final private static final Logger logger = LoggerFactory.getLogger(FileSystemSwapManager.class); public FileSystemSwapManager() { final NiFiProperties properties = NiFiProperties.getInstance(); final Path flowFileRepoPath = properties.getFlowFileRepositoryPath(); - + this.storageDirectory = flowFileRepoPath.resolve("swap").toFile(); if (!storageDirectory.exists() && !storageDirectory.mkdirs()) { throw new RuntimeException("Cannot create Swap Storage directory " + storageDirectory.getAbsolutePath()); @@ -138,6 +138,7 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } + @Override public synchronized void start(final FlowFileRepository flowFileRepository, final QueueProvider connectionProvider, final ContentClaimManager claimManager, final EventReporter eventReporter) { this.claimManager = claimManager; this.flowFileRepository = flowFileRepository; @@ -244,7 +245,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager { return deserializeFlowFiles(in, numRecords, queue, swapEncodingVersion, false, claimManager); } - static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue, final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException { + static List<FlowFileRecord> deserializeFlowFiles(final DataInputStream in, final int numFlowFiles, final FlowFileQueue queue, + final int serializationVersion, final boolean incrementContentClaims, final ContentClaimManager claimManager) throws IOException { final List<FlowFileRecord> flowFiles = new ArrayList<>(); for (int i = 0; i < numFlowFiles; i++) { // legacy encoding had an "action" because it used to be couple with FlowFile Repository code @@ -451,15 +453,15 @@ public class FileSystemSwapManager implements FlowFileSwapManager { } } catch (final EOFException eof) { error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Corrupt Swap File; will remove this Swap File: " + swapFile); - - if ( !swapFile.delete() ) { + + if (!swapFile.delete()) { warn("Failed to remove corrupt Swap File " + swapFile + "; This file should be cleaned up manually"); } } catch (final FileNotFoundException fnfe) { error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to: Could not find Swap File " + swapFile); } catch (final Exception e) { error("Failed to Swap In FlowFiles for " + flowFileQueue + " due to " + e, e); - + if (swapFile != null) { queue.add(swapFile); } @@ -475,27 +477,27 @@ public class FileSystemSwapManager implements FlowFileSwapManager { private void error(final String error, final Throwable t) { error(error); - if ( logger.isDebugEnabled() ) { + if (logger.isDebugEnabled()) { logger.error("", t); } } - + private void error(final String error) { logger.error(error); - if ( eventReporter != null ) { + if (eventReporter != null) { eventReporter.reportEvent(Severity.ERROR, EVENT_CATEGORY, error); } } - + private void warn(final String warning) { logger.warn(warning); - if ( eventReporter != null ) { + if (eventReporter != null) { eventReporter.reportEvent(Severity.WARNING, EVENT_CATEGORY, warning); } } - - + private class SwapOutTask implements Runnable { + private final BlockingQueue<FlowFileQueue> connectionQueue; public SwapOutTask(final BlockingQueue<FlowFileQueue> connectionQueue) { @@ -527,8 +529,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager { recordsSwapped = serializeFlowFiles(toSwap, flowFileQueue, swapLocation, fos); fos.getFD().sync(); } - - if ( swapTempFile.renameTo(swapFile) ) { + + if (swapTempFile.renameTo(swapFile)) { flowFileRepository.swapFlowFilesOut(toSwap, flowFileQueue, swapLocation); } else { error("Failed to swap out FlowFiles from " + flowFileQueue + " due to: Unable to rename swap file from " + swapTempFile + " to " + swapFile); @@ -563,8 +565,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager { * Recovers FlowFiles from all Swap Files, returning the largest FlowFile ID * that was recovered. * - * @param queueProvider - * @return + * @param queueProvider provider + * @return the largest FlowFile ID that was recovered */ @Override public long recoverSwappedFlowFiles(final QueueProvider queueProvider, final ContentClaimManager claimManager) { @@ -591,16 +593,16 @@ public class FileSystemSwapManager implements FlowFileSwapManager { long maxRecoveredId = 0L; for (final File swapFile : swapFiles) { - if ( TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches() ) { - if ( swapFile.delete() ) { + if (TEMP_SWAP_FILE_PATTERN.matcher(swapFile.getName()).matches()) { + if (swapFile.delete()) { logger.info("Removed incomplete/temporary Swap File " + swapFile); } else { warn("Failed to remove incomplete/temporary Swap File " + swapFile + "; this file should be cleaned up manually"); } - + continue; } - + // read record to disk via the swap file try (final InputStream fis = new FileInputStream(swapFile); final InputStream bufferedIn = new BufferedInputStream(fis); @@ -618,7 +620,8 @@ public class FileSystemSwapManager implements FlowFileSwapManager { final String connectionId = in.readUTF(); final FlowFileQueue queue = queueMap.get(connectionId); if (queue == null) { - error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + connectionId + " and that Connection does not exist"); + error("Cannot recover Swapped FlowFiles from Swap File " + swapFile + " because the FlowFiles belong to a Connection with ID " + + connectionId + " and that Connection does not exist"); continue; }
