http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java new file mode 100644 index 0000000..1845ed2 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/PropertyValue.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.components; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.expression.AttributeValueDecorator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * <p> + * A PropertyValue provides a mechanism whereby the currently configured value + * of a processor property can be obtained in different forms. + * </p> + */ +public interface PropertyValue { + + /** + * @return the raw property value as a string + */ + public String getValue(); + + /** + * @return an integer representation of the property value, or + * <code>null</code> if not set + * @throws NumberFormatException if not able to parse + */ + public Integer asInteger(); + + /** + * @return a Long representation of the property value, or <code>null</code> + * if not set + * @throws NumberFormatException if not able to parse + */ + public Long asLong(); + + /** + * @return a Boolean representation of the property value, or + * <code>null</code> if not set + */ + public Boolean asBoolean(); + + /** + * @return a Float representation of the property value, or + * <code>null</code> if not set + * @throws NumberFormatException if not able to parse + */ + public Float asFloat(); + + /** + * @return a Double representation of the property value, of + * <code>null</code> if not set + * @throws NumberFormatException if not able to parse + */ + public Double asDouble(); + + /** + * @param timeUnit specifies the TimeUnit to convert the time duration into + * @return a Long value representing the value of the configured time period + * in terms of the specified TimeUnit; if the property is not set, returns + * <code>null</code> + */ + public Long asTimePeriod(TimeUnit timeUnit); + + /** + * + * @param dataUnit specifies the DataUnit to convert the data size into + * @return a Long value representing the value of the configured data size + * in terms of the specified DataUnit; if hte property is not set, returns + * <code>null</code> + */ + public Double asDataSize(DataUnit dataUnit); + + /** + * @return the ControllerService whose identifier is the raw value of + * <code>this</code>, or <code>null</code> if either the value is not set or + * the value does not identify a ControllerService + */ + public ControllerService asControllerService(); + + /** + * @param <T> the generic type of the controller service + * @param serviceType the class of the Controller Service + * @return the ControllerService whose identifier is the raw value of the + * <code>this</code>, or <code>null</code> if either the value is not set or + * the value does not identify a ControllerService. The object returned by + * this method is explicitly cast to type specified, if the type specified + * is valid. Otherwise, throws an IllegalArgumentException + * + * @throws IllegalArgumentException if the value of <code>this</code> points + * to a ControllerService but that service is not of type + * <code>serviceType</code> or if <code>serviceType</code> references a + * class that is not an interface + */ + public <T extends ControllerService> T asControllerService(Class<T> serviceType) throws IllegalArgumentException; + + /** + * @return <code>true</code> if the user has configured a value, or if the + * {@link PropertyDescriptor} for the associated property has a default + * value, <code>false</code> otherwise + */ + public boolean isSet(); + + /** + * <p> + * Replaces values in the Property Value using the Attribute Expression + * Language; a PropertyValue with the new value is then returned, supporting + * call chaining. + * </p> + * + * @return a PropertyValue with the new value is returned, supporting call + * chaining + * + * @throws ProcessException if the Query cannot be compiled or evaluating + * the query against the given attributes causes an Exception to be thrown + */ + public PropertyValue evaluateAttributeExpressions() throws ProcessException; + + /** + * <p> + * Replaces values in the Property Value using the Attribute Expression + * Language; a PropertyValue with the new value is then returned, supporting + * call chaining. + * </p> + * + * @param flowFile to evaluate attributes of + * @return a PropertyValue with the new value is returned, supporting call + * chaining + * + * @throws ProcessException if the Query cannot be compiled or evaluating + * the query against the given attributes causes an Exception to be thrown + */ + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile) throws ProcessException; + + /** + * <p> + * Replaces values in the Property Value using the Attribute Expression + * Language. The supplied decorator is then given a chance to decorate the + * value, and a PropertyValue with the new value is then returned, + * supporting call chaining. + * </p> + * + * @param decorator The supplied decorator is then given a chance to + * decorate the value + * @return a PropertyValue with the new value is then returned, supporting + * call chaining + * + * @throws ProcessException if the Query cannot be compiled or evaluating + * the query against the given attributes causes an Exception to be thrown + */ + public PropertyValue evaluateAttributeExpressions(AttributeValueDecorator decorator) throws ProcessException; + + /** + * <p> + * Replaces values in the Property Value using the Attribute Expression + * Language. The supplied decorator is then given a chance to decorate the + * value, and a PropertyValue with the new value is then returned, + * supporting call chaining. + * </p> + * + * @param flowFile to evaluate expressions against + * @param decorator The supplied decorator is then given a chance to + * decorate the value + * + * @return a PropertyValue with the new value is then returned, supporting + * call chaining + * + * @throws ProcessException if the Query cannot be compiled or evaluating + * the query against the given attributes causes an Exception to be thrown + */ + public PropertyValue evaluateAttributeExpressions(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException; +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java new file mode 100644 index 0000000..a1dcf43 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/ValidationContext.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.components; + +import java.util.Map; + +import org.apache.nifi.controller.ControllerService; +import org.apache.nifi.controller.ControllerServiceLookup; +import org.apache.nifi.expression.ExpressionLanguageCompiler; + +public interface ValidationContext { + + /** + * @return the {@link ControllerServiceLookup} which can be used to obtain + * Controller Services + */ + ControllerServiceLookup getControllerServiceLookup(); + + /** + * @param controllerService to lookup the validation context of + * @return a ValidationContext that is appropriate for validating the given + * {@link ControllerService} + */ + ValidationContext getControllerServiceValidationContext(ControllerService controllerService); + + /** + * @return a new {@link ExpressionLanguageCompiler} that can be used to + * compile & evaluate Attribute Expressions + */ + ExpressionLanguageCompiler newExpressionLanguageCompiler(); + + /** + * @param property being validated + * @return a PropertyValue that encapsulates the value configured for the + * given PropertyDescriptor + */ + PropertyValue getProperty(PropertyDescriptor property); + + /** + * @param value to make a PropertyValue object for + * @return a PropertyValue that represents the given value + */ + PropertyValue newPropertyValue(String value); + + /** + * @return a Map of all configured Properties + */ + Map<PropertyDescriptor, String> getProperties(); + + /** + * @return the currently configured Annotation Data + */ + String getAnnotationData(); + + /** + * There are times when the framework needs to consider a component valid, + * even if it references an invalid ControllerService. This method will + * return <code>false</code> if the component is to be considered valid even + * if the given Controller Service is referenced and is invalid. + * + * @param service to check if validation is required + * @return <code>false</code> if the component is to be considered valid + * even if the given Controller Service is referenced and is invalid + */ + boolean isValidationRequired(ControllerService service); + + /** + * @param value to test whether expression language is present + * @return <code>true</code> if the given value contains a NiFi Expression + * Language expression, <code>false</code> if it does not + */ + boolean isExpressionLanguagePresent(String value); + + /** + * @param propertyName to test whether expression language is supported + * @return <code>true</code> if the property with the given name supports + * the NiFi Expression Language, <code>false</code> if the property does not + * support the Expression Language or is not a valid property name + */ + boolean isExpressionLanguageSupported(String propertyName); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java b/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java new file mode 100644 index 0000000..e0beec8 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/ValidationResult.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.components; + +import java.util.Objects; + +/** + * + * Immutable - thread safe + * + */ +public class ValidationResult { + + private final String subject; + private final String input; + private final String explanation; + private final boolean valid; + + private ValidationResult(final Builder builder) { + this.subject = builder.subject; + this.input = builder.input; + this.explanation = builder.explanation; + this.valid = builder.valid; + } + + /** + * @return true if current result is valid; false otherwise + */ + public boolean isValid() { + return this.valid; + } + + /** + * @return this input value that was tested for validity + */ + public String getInput() { + return this.input; + } + + /** + * @return an explanation of the validation result + */ + public String getExplanation() { + return this.explanation; + } + + /** + * @return the item being validated + */ + public String getSubject() { + return this.subject; + } + + @Override + public int hashCode() { + int hash = 3; + hash = 79 * hash + Objects.hashCode(this.subject); + hash = 79 * hash + Objects.hashCode(this.input); + hash = 79 * hash + Objects.hashCode(this.explanation); + hash = 79 * hash + (this.valid ? 1 : 0); + return hash; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final ValidationResult other = (ValidationResult) obj; + if (!Objects.equals(this.subject, other.subject)) { + return false; + } + if (!Objects.equals(this.input, other.input)) { + return false; + } + if (!Objects.equals(this.explanation, other.explanation)) { + return false; + } + if (this.valid != other.valid) { + return false; + } + return true; + } + + @Override + public String toString() { + if (input == null) { + return String.format("'%s' is %s because %s", subject, (valid ? "valid" : "invalid"), explanation); + } else { + return String.format("'%s' validated against '%s' is %s because %s", subject, input, (valid ? "valid" : "invalid"), explanation); + } + } + + public static final class Builder { + + private boolean valid = false; + private String input = null; + private String explanation = ""; + private String subject = ""; + + /** + * Defaults to false + * + * @param valid true if is valid; false otherwise + * @return the builder + */ + public Builder valid(final boolean valid) { + this.valid = valid; + return this; + } + + /** + * Defaults to empty string + * + * @param input what was validated + * @return the builder + */ + public Builder input(final String input) { + if (null != input) { + this.input = input; + } + return this; + } + + /** + * Defaults to empty string + * + * @param explanation of validation result + * @return the builder + */ + public Builder explanation(final String explanation) { + if (null != explanation) { + this.explanation = explanation; + } + return this; + } + + /** + * Defaults to empty string + * + * @param subject the thing that was validated + * @return the builder + */ + public Builder subject(final String subject) { + if (null != subject) { + this.subject = subject; + } + return this; + } + + public ValidationResult build() { + return new ValidationResult(this); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/components/Validator.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/components/Validator.java b/nifi-api/src/main/java/org/apache/nifi/components/Validator.java new file mode 100644 index 0000000..a12b532 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/components/Validator.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.components; + +/** + * + */ +public interface Validator { + + /** + * Validator object providing validation behavior in which validation always + * fails + */ + Validator INVALID = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + return new ValidationResult.Builder().subject(subject).explanation(String.format("'%s' is not a supported property", subject)).input(input).build(); + } + }; + + /** + * Validator object providing validation behavior in which validation always + * passes + */ + Validator VALID = new Validator() { + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } + }; + + /** + * @param subject what is being validated + * @param input the string to be validated + * @param context the ValidationContext to use when validating properties + * @return ValidationResult + * @throws NullPointerException of given input is null + */ + ValidationResult validate(String subject, String input, ValidationContext context); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java new file mode 100644 index 0000000..cd3b9bd --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/AbstractControllerService.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.Map; + +import org.apache.nifi.components.AbstractConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.controller.annotation.OnConfigured; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.ProcessorInitializationContext; +import org.apache.nifi.reporting.InitializationException; + +public abstract class AbstractControllerService extends AbstractConfigurableComponent implements ControllerService { + + private String identifier; + private ControllerServiceLookup serviceLookup; + private volatile ConfigurationContext configContext; + private ComponentLog logger; + + @Override + public final void initialize(final ControllerServiceInitializationContext context) throws InitializationException { + this.identifier = context.getIdentifier(); + serviceLookup = context.getControllerServiceLookup(); + logger = context.getLogger(); + init(context); + } + + @Override + public String getIdentifier() { + return identifier; + } + + @OnConfigured + public void onConfigurationChange(final ConfigurationContext context) { + this.configContext = context; + } + + /** + * @param descriptor to retrieve value of + * @return the currently configured value for the given + * {@link PropertyDescriptor} + */ + protected final PropertyValue getProperty(final PropertyDescriptor descriptor) { + return configContext.getProperty(descriptor); + } + + /** + * @return an unmodifiable map of all configured properties for this + * {@link ControllerService} + */ + protected final Map<PropertyDescriptor, String> getProperties() { + return configContext.getProperties(); + } + + /** + * @return the {@link ControllerServiceLookup} that was passed to the + * {@link #init(ProcessorInitializationContext)} method + */ + protected final ControllerServiceLookup getControllerServiceLookup() { + return serviceLookup; + } + + /** + * Provides a mechanism by which subclasses can perform initialization of + * the Reporting Task before it is scheduled to be run + * + * @param config of initialization context + * @throws InitializationException if unable to init + */ + protected void init(final ControllerServiceInitializationContext config) throws InitializationException { + } + + /** + * @return the logger that has been provided to the component by the + * framework in its initialize method + */ + protected ComponentLog getLogger() { + return logger; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java new file mode 100644 index 0000000..03965d4 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ConfigurationContext.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; + +/** + * This context is passed to ControllerServices and Reporting Tasks in order + * to expose their configuration to them. + */ +public interface ConfigurationContext { + + /** + * @param property to retrieve by name + * @return the configured value for the property with the given name + */ + PropertyValue getProperty(PropertyDescriptor property); + + /** + * @return an unmodifiable map of all configured properties for this + * {@link ControllerService} + */ + Map<PropertyDescriptor, String> getProperties(); + + /** + * @return a String representation of the scheduling period, or <code>null</code> if + * the component does not have a scheduling period (e.g., for ControllerServices) + */ + String getSchedulingPeriod(); + + /** + * Returns the amount of time, in the given {@link TimeUnit} that will + * elapsed between the return of one execution of the + * component's <code>onTrigger</code> method and + * the time at which the method is invoked again. This method will return + * null if the component does not have a scheduling period (e.g., for ControllerServices) + * + * @param timeUnit unit of time for scheduling + * @return period of time or <code>null</code> if component does not have a scheduling + * period + */ + Long getSchedulingPeriod(TimeUnit timeUnit); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java new file mode 100644 index 0000000..a77c69d --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerService.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.components.ConfigurableComponent; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.reporting.ReportingTask; + +/** + * <p> + * This interface provides a mechanism for creating services that are shared + * among all {@link Processor}s, {@link ReportingTask}s, and other + * {@code ControllerService}s. + * </p> + * + * <p> + * <code>ControllerService</code>s are discovered using Java's + * <code>ServiceLoader</code> mechanism. As a result, all implementations must + * follow these rules: + * + * <ul> + * <li>The implementation must implement this interface.</li> + * <li>The implementation must have a file named + * org.apache.nifi.controller.ControllerService located within the jar's + * <code>META-INF/services</code> directory. This file contains a list of + * fully-qualified class names of all <code>ControllerService</code>s in the + * jar, one-per-line. + * <li>The implementation must support a default constructor.</li> + * </ul> + * </p> + * + * <p> + * <b>All implementations of this interface must be thread-safe.</b> + * </p> + * + * <h2>Accessing Controller Services</h2> + * <p> + * A ControllerService is accessible only through its interface. The framework + * provides access to a ControllerService through two different mechanisms: + * <ul> + * <li> + * A {@link PropertyDescriptor} can be created via the + * {@link PropertyDescriptor.Builder} after calling the + * {@link PropertyDescriptor.Builder#identifiesControllerService(Class) identifiesControllerService(Class)} + * method and then the ControllerService is accessed via + * {@link PropertyValue#asControllerService(Class)} method. + * <p> + * For example: + * </p> + * <p> + * <code><pre> + * public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor.Builder() + * .name("My Property") + * .description("Example Property") + * .identifiesControllerService( MyControllerServiceInterface.class ) + * .build(); + * + * ... + * public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + * // Obtain the user-selected controller service + * final MyControllerServiceInterface service = context.getProperty(MY_PROPERTY).asControllerService( MyControllerServiceInterface.class ); + * ... + * } + * + * </pre></code></p> + * </li> + * <li>A Controller Service can be obtained via a + * {@link ControllerServiceLookup}. This lookup may be obtained, for example, + * from the {@link ProcessContext} that is provided to a {@link Processor}'s + * {@link Processor#onTrigger(ProcessContext, ProcessSessionFactory) onTrigger} + * method. + * <p> + * For example: + * </p> + * <p> + * <code><pre> + * public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { + * final MyControllerServiceInterface service = (MyControllerServiceInterface) context.getControllerServiceLookup().getControllerService("service_identifier"); + * } + * </pre></code></p> + * </li> + * </ul> + * </p> + * + * <h2>Defining a Controller Service</h2> + * <p> + * Note in both of the examples above, that the Controller Service was accessed + * only by its interface, and this interface extends ControllerService. If we + * have an implementation named MyServiceImpl, for example, that implements + * MyControllerServiceInterface, we cannot, in either case, attempt to cast the + * ControllerService to the desired implementation. Doing so will result in a + * {@link ClassCastException}. This is by design and is done for the following + * reasons: + * + * <ul> + * <li> + * It is a good coding practice to implement such a service as an interface in + * general. + * </li> + * <li> + * A Controller Service can be referenced from different NiFi Archives (NARs). + * This means that the Controller Service may be defined in one ClassLoader and + * referenced from another unrelated ClassLoader. In order to account for this, + * NiFi will change the current thread's ClassLoader as appropriate when + * entering the Controller Service's code and revert back to the previous + * ClassLoader after exiting the Controller Service's code. + * </li> + * </ul> + * </p> + * + * <h2>Controller Services and NARs</h2> + * <p> + * Due to the fact that a Controller Service may be referenced from a different + * NAR than the one in which the implementation lives, it is crucial that both + * the Controller Service's interface and the code referencing the interface + * inherit from the same ClassLoader. This is accomplished by ensuring that the + * NAR that contains the Controller Service interface is the parent (or + * ancestor) of the NAR that references the Controller Service interface. + * </p> + * + * <p> + * Typically, this is done by creating a NAR structure as follows: + * <pre> + * + my-services-api-nar + * +--- service-X-implementation-nar + * +--- service-Y-implementation-nar + * +--- service-Z-implementation-nar + * +--- processor-A-nar + * +--- processor-B-nar + * </pre> + * </p> + * + * <p> + * In this case, the {@code MyControllerServiceInterface} interface, and any + * other Controller Service interfaces, will be defined in the + * {@code my-services-api-nar} NAR. Implementations are then encapsulated within + * the {@code service-X-implementation-nar}, + * {@code service-Y-implementation-nar}, and + * {@code service-Z-implementation-nar} NARs. All Controller Services and all + * Processors defined in these NARs are able to reference any other Controller + * Services whose interfaces are provided in the {@code my-services-api-nar} + * NAR. + * </p> + * + * <p> + * For more information on NARs, see the NiFi Developer Guide. + * </p> + */ +public interface ControllerService extends ConfigurableComponent { + + /** + * Provides the Controller Service with access to objects that may be of use + * throughout the life of the service. This method will be called before any + * properties are set + * + * @param context of initialization + * @throws org.apache.nifi.reporting.InitializationException if unable to init + */ + void initialize(ControllerServiceInitializationContext context) throws InitializationException; + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java new file mode 100644 index 0000000..6fcee0c --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceInitializationContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import org.apache.nifi.logging.ComponentLog; + +public interface ControllerServiceInitializationContext { + + /** + * @return the identifier associated with the {@link ControllerService} with + * which this context is associated + */ + String getIdentifier(); + + /** + * @return the {@link ControllerServiceLookup} which can be used to obtain + * Controller Services + */ + ControllerServiceLookup getControllerServiceLookup(); + + /** + * @return a logger that can be used to log important events in a standard + * way and generate bulletins when appropriate + */ + ComponentLog getLogger(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java new file mode 100644 index 0000000..f5300b1 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ControllerServiceLookup.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.Set; + +public interface ControllerServiceLookup { + + /** + * @param serviceIdentifier of controller service + * @return the ControllerService that is registered with the given + * identifier + */ + ControllerService getControllerService(String serviceIdentifier); + + /** + * @param serviceIdentifier identifier of service to check + * @return <code>true</code> if the Controller Service with the given + * identifier is enabled, <code>false</code> otherwise. If the given + * identifier is not known by this ControllerServiceLookup, returns + * <code>false</code> + */ + boolean isControllerServiceEnabled(String serviceIdentifier); + + /** + * @param serviceIdentifier idenfitier of service to check + * @return <code>true</code> if the Controller Service with the given + * identifier has been enabled but is still in the transitioning state, + * otherwise returns <code>false</code>. If the given identifier is not + * known by this ControllerServiceLookup, returns <code>false</code> + */ + boolean isControllerServiceEnabling(String serviceIdentifier); + + /** + * @param service service to check + * @return <code>true</code> if the given Controller Service is enabled, + * <code>false</code> otherwise. If the given Controller Service is not + * known by this ControllerServiceLookup, returns <code>false</code> + */ + boolean isControllerServiceEnabled(ControllerService service); + + /** + * @param serviceType type of service to get identifiers for + * @return the set of all Controller Service Identifiers whose Controller + * Service is of the given type. + * @throws IllegalArgumentException if the given class is not an interface + */ + Set<String> getControllerServiceIdentifiers(Class<? extends ControllerService> serviceType) throws IllegalArgumentException; + + /** + * @param serviceIdentifier identifier to look up + * @return the name of the Controller service with the given identifier. If + * no service can be found with this identifier, returns {@code null} + */ + String getControllerServiceName(String serviceIdentifier); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java b/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java new file mode 100644 index 0000000..e1baeb7 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/FlowFileQueue.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.Collection; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.controller.repository.FlowFileRecord; +import org.apache.nifi.flowfile.FlowFilePrioritizer; +import org.apache.nifi.processor.FlowFileFilter; +import org.apache.nifi.processor.QueueSize; + +public interface FlowFileQueue { + + /** + * @return the unique identifier for this FlowFileQueue + */ + String getIdentifier(); + + /** + * @return list of processing priorities for this queue + */ + List<FlowFilePrioritizer> getPriorities(); + + /** + * @return the minimum number of FlowFiles that must be present in order for + * FlowFiles to begin being swapped out of the queue + */ + int getSwapThreshold(); + + /** + * Resets the comparator used by this queue to maintain order. + * + * @param newPriorities the ordered list of prioritizers to use to determine + * order within this queue. + * @throws NullPointerException if arg is null + */ + void setPriorities(List<FlowFilePrioritizer> newPriorities); + + /** + * Establishes this queue's preferred maximum work load. + * + * @param maxQueueSize the maximum number of flow files this processor + * recommends having in its work queue at any one time + */ + void setBackPressureObjectThreshold(long maxQueueSize); + + /** + * @return maximum number of flow files that should be queued up at any one + * time + */ + long getBackPressureObjectThreshold(); + + /** + * @param maxDataSize Establishes this queue's preferred maximum data size. + */ + void setBackPressureDataSizeThreshold(String maxDataSize); + + /** + * @return maximum data size that should be queued up at any one time + */ + String getBackPressureDataSizeThreshold(); + + QueueSize size(); + + /** + * @return total size in bytes of the queue flow file's content + */ + long contentSize(); + + /** + * @return true if no items queue; false otherwise + */ + boolean isEmpty(); + + /** + * @return true if the active queue is empty; false otherwise. The Active + * queue contains those FlowFiles that can be processed immediately and does + * not include those FlowFiles that have been swapped out or are currently + * being processed + */ + boolean isActiveQueueEmpty(); + + QueueSize getActiveQueueSize(); + + /** + * Returns a QueueSize that represents all FlowFiles that are 'unacknowledged'. A FlowFile + * is considered to be unacknowledged if it has been pulled from the queue by some component + * but the session that pulled the FlowFile has not yet been committed or rolled back. + * + * @return a QueueSize that represents all FlowFiles that are 'unacknowledged'. + */ + QueueSize getUnacknowledgedQueueSize(); + + void acknowledge(FlowFileRecord flowFile); + + void acknowledge(Collection<FlowFileRecord> flowFiles); + + /** + * @return true if maximum queue size has been reached or exceeded; false + * otherwise + */ + boolean isFull(); + + /** + * places the given file into the queue + * + * @param file to place into queue + */ + void put(FlowFileRecord file); + + /** + * places the given files into the queue + * + * @param files to place into queue + */ + void putAll(Collection<FlowFileRecord> files); + + /** + * Removes all records from the internal swap queue and returns them. + * + * @return all removed records from internal swap queue + */ + List<FlowFileRecord> pollSwappableRecords(); + + /** + * Restores the records from swap space into this queue, adding the records + * that have expired to the given set instead of enqueuing them. + * + * @param records that were swapped in + */ + void putSwappedRecords(Collection<FlowFileRecord> records); + + /** + * Updates the internal counters of how much data is queued, based on + * swapped data that is being restored. + * + * @param numRecords count of records swapped in + * @param contentSize total size of records being swapped in + */ + void incrementSwapCount(int numRecords, long contentSize); + + /** + * @return the number of FlowFiles that are enqueued and not swapped + */ + int unswappedSize(); + + int getSwapRecordCount(); + + int getSwapQueueSize(); + + /** + * @param expiredRecords expired records + * @return the next flow file on the queue; null if empty + */ + FlowFileRecord poll(Set<FlowFileRecord> expiredRecords); + + /** + * @param maxResults limits how many results can be polled + * @param expiredRecords for expired records + * @return the next flow files on the queue up to the max results; null if + * empty + */ + List<FlowFileRecord> poll(int maxResults, Set<FlowFileRecord> expiredRecords); + + /** + * Drains flow files from the given source queue into the given destination + * list. + * + * @param sourceQueue queue to drain from + * @param destination Collection to drain to + * @param maxResults max number to drain + * @param expiredRecords for expired records + * @return size (bytes) of flow files drained from queue + */ + long drainQueue(Queue<FlowFileRecord> sourceQueue, List<FlowFileRecord> destination, int maxResults, Set<FlowFileRecord> expiredRecords); + + List<FlowFileRecord> poll(FlowFileFilter filter, Set<FlowFileRecord> expiredRecords); + + String getFlowFileExpiration(); + + int getFlowFileExpiration(TimeUnit timeUnit); + + void setFlowFileExpiration(String flowExpirationPeriod); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java b/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java new file mode 100644 index 0000000..d482eae --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/ScheduledState.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +/** + * Indicates the valid values for the state of a <code>Triggerable</code> entity + * with respect to scheduling the entity to run. + */ +public enum ScheduledState { + + /** + * Entity cannot be scheduled to run + */ + DISABLED, + /** + * Entity can be scheduled to run but currently is not + */ + STOPPED, + /** + * Entity is currently scheduled to run + */ + RUNNING; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/Snippet.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/Snippet.java b/nifi-api/src/main/java/org/apache/nifi/controller/Snippet.java new file mode 100644 index 0000000..93f3327 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/Snippet.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.Set; + +/** + * A Snippet represents a segment of the flow + */ +public interface Snippet { + + /** + * @return id of this snippet + */ + public String getId(); + + /** + * @return Whether or not this snippet is linked to the data flow. If the Snippet is + * deleted and is linked, then the underlying components will also be + * deleted. If the Snippet is deleted and is NOT linked, only the Snippet is + * removed + */ + public boolean isLinked(); + + /** + * @return parent group id of the components in this snippet + */ + public String getParentGroupId(); + + /** + * @return connections in this snippet + */ + public Set<String> getConnections(); + + /** + * @return funnels in this snippet + */ + public Set<String> getFunnels(); + + /** + * @return input ports in this snippet + */ + public Set<String> getInputPorts(); + + /** + * @return output ports in this snippet + */ + public Set<String> getOutputPorts(); + + /** + * @return labels in this snippet + */ + public Set<String> getLabels(); + + /** + * @return the identifiers of all ProcessGroups in this Snippet + */ + public Set<String> getProcessGroups(); + + /** + * @return the identifiers of all Processors in this Snippet + */ + public Set<String> getProcessors(); + + /** + * @return the identifiers of all RemoteProcessGroups in this Snippet + */ + public Set<String> getRemoteProcessGroups(); + + /** + * @return Determines if this snippet is empty + */ + public boolean isEmpty(); + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/Triggerable.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/Triggerable.java b/nifi-api/src/main/java/org/apache/nifi/controller/Triggerable.java new file mode 100644 index 0000000..4b3149b --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/Triggerable.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller; + +import java.util.concurrent.TimeUnit; + +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; + +public interface Triggerable { + + public static final long MINIMUM_SCHEDULING_NANOS = 30000L; + + /** + * <p> + * The method called when this processor is triggered to operate by the + * controller. This method may be called concurrently from different + * threads. When this method is called depends on how this processor is + * configured within a controller to be triggered (timing or event + * based).</p> + * + * <p> + * The processor may commit, roll back, or allow the framework to + * automatically manage the session. If the sessions are to be managed by + * the framework (recommended) then what it will do depends on several + * factors. If the method call returns due to an exception then the session + * will be rolled back. If the method returns normally then the session will + * be committed or the framework may use the session again for another + * processor down stream</p> + * + * @param context in which the component is triggered + * @param sessionFactory used to generate {@link ProcessSession}s to use for + * operating on flow files within the repository + * + * @throws ProcessException if processing did not complete normally though + * indicates the problem is an understood potential outcome of processing. + * The controller/caller will handle these exceptions gracefully such as + * logging, etc.. If another type of exception is allowed to propagate the + * controller may no longer trigger this processor to operate as this would + * indicate a probable coding defect. + */ + void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException; + + /** + * Determines the number of concurrent tasks that may be running for this + * <code>Triggerable</code>. + * + * @param taskCount a number of concurrent tasks this processor may have + * running + * @throws IllegalArgumentException if the given value is less than 1 + */ + void setMaxConcurrentTasks(int taskCount); + + /** + * @return the number of tasks that may execute concurrently for this + * <code>Triggerable</code>. + */ + int getMaxConcurrentTasks(); + + /** + * Indicates the {@link ScheduledState} of this <code>Triggerable</code>. A + * value of stopped does NOT indicate that the <code>Triggerable</code> has + * no active threads, only that it is not currently scheduled to be given + * any more threads. To determine whether or not the + * <code>Triggerable</code> has any active threads, see + * {@link ProcessScheduler#getActiveThreadCount(nifi.connectable.Connectable)}. + * + * @return the schedule state + */ + ScheduledState getScheduledState(); + + /** + * Indicates whether or not this <code>Triggerable</code> is "running". It + * is considered "running" if it is scheduled to run OR if it is no longer + * scheduled to be given threads but the remaining threads from the last + * invocation of {@link #onTrigger(ProcessContext, ProcessSessionFactory)} + * have not yet returned + * + * @return true if running;false otherwise + */ + boolean isRunning(); + + /** + * @param timeUnit for the scheduling period of the component + * @return the amount of time between each scheduling period + */ + long getSchedulingPeriod(TimeUnit timeUnit); + + /** + * @return a string representation of the time between each scheduling + * period + */ + String getSchedulingPeriod(); + + /** + * Updates how often this Triggerable should be triggered to run + * + * @param schedulingPeriod to set + */ + void setScheduldingPeriod(String schedulingPeriod); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java b/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java new file mode 100644 index 0000000..aef80ac --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marker annotation a ControllerService implementation can use to indicate a + * method should be called after all of the properties have been set for the + * Controller Service. Methods using this annotation must take either 0 + * arguments or a single argument of type + * {@link nifi.controller.ConfigurationContext ConfigurationContext}. + * + * + * @deprecated This annotation has been replaced by those in the + * {@link org.apache.nifi.annotation.lifecycle} package. + */ +@Documented +@Target({ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +@Inherited +@Deprecated +public @interface OnConfigured { + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java new file mode 100644 index 0000000..ee3ead9 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/ContentRepository.java @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.file.Path; +import java.util.Collection; +import java.util.Set; + +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.controller.repository.claim.ContentClaimManager; + +/** + * Defines the capabilities of a content repository. Append options are not + * available on the methods but a merge capability is provided which between + * that and creating new claims a merge is available. + * + */ +public interface ContentRepository { + + /** + * Initializes the Content Repository, providing to it the + * ContentClaimManager that is to be used for interacting with Content + * Claims + * + * @param claimManager to handle claims + * @throws java.io.IOException if unable to init + */ + void initialize(ContentClaimManager claimManager) throws IOException; + + /** + * Shuts down the Content Repository, freeing any resources that may be + * held. This is called when an administrator shuts down NiFi. + */ + void shutdown(); + + /** + * @return the names of all Containers that exist for this Content + * Repository + */ + Set<String> getContainerNames(); + + /** + * @param containerName name of container to check capacity on + * @return the maximum number of bytes that can be stored in the storage + * mechanism that backs the container with the given name + * @throws java.io.IOException if unable to check capacity + * @throws IllegalArgumentException if no container exists with the given + * name + */ + long getContainerCapacity(String containerName) throws IOException; + + /** + * @param containerName to check space on + * @return the number of bytes available to be used used by the storage + * mechanism that backs the container with the given name + * @throws java.io.IOException if unable to check space + * @throws IllegalArgumentException if no container exists with the given + * name + */ + long getContainerUsableSpace(String containerName) throws IOException; + + /** + * Creates a new content claim + * + * @param lossTolerant indicates whether the content for the new claim is + * loss tolerant. If true the repository might choose more volatile storage + * options which could increase performance for a tradeoff with reliability + * @return newly created claim + * @throws java.io.IOException if unable to create claim + */ + ContentClaim create(boolean lossTolerant) throws IOException; + + /** + * Increments the number of claimants for the given claim + * + * @param claim to increment + * @return the number of claimants after incrementing + */ + int incrementClaimaintCount(ContentClaim claim); + + /** + * Obtains the current number of claimants for the given claim + * + * @param claim to get count of + * @return the number of claimants + */ + int getClaimantCount(ContentClaim claim); + + /** + * Reduces the number of claimants for the given claim. Even if the given + * claim is null or content cannot be found or removed no exception will be + * thrown. + * + * @param claim to decrement + * @return new claimant count for the given claim + */ + int decrementClaimantCount(ContentClaim claim); + + /** + * Removes the content indicated by the given claim + * + * @param claim to remove + * + * @return a boolean indicating whether or not the destruction of the claim + * was successful + */ + boolean remove(ContentClaim claim); + + /** + * Clones the content for the given content claim and returns content claim + * of the new object + * + * @param original to clone + * @param lossTolerant if can be place in a loss tolerant repository + * @return new claim + * @throws IOException if an IO error occurs. Any content written to the new + * destination prior to the error will be destroyed + */ + ContentClaim clone(ContentClaim original, boolean lossTolerant) throws IOException; + + /** + * Creates a new content item that is the merger in iteration order of all + * content for the given claims + * + * @return the size of the destination + * @param claims the claims to merge which will be combined in order of + * collection iteration + * @param destination the claim to write the merged content to + * @param header if supplied will be prepended to the output + * @param footer if supplied will be appended to the output + * @param demarcator if supplied will be placed in between each merged + * object + * @throws IOException if unable to merge + * @throws IllegalArgumentException if the given destination is included in + * the given claims + */ + long merge(Collection<ContentClaim> claims, ContentClaim destination, byte[] header, byte[] footer, byte[] demarcator) throws IOException; + + /** + * Imports content from the given path creating a new content object and + * claim within the repository. + * + * @return the size of the claim + * @param content to import from + * @param claim the claim to write imported content to + * @throws IOException if failure to read given content + */ + long importFrom(Path content, ContentClaim claim) throws IOException; + + /** + * Imports content from the given path to the specified claim, appending or + * replacing the current claim, according to the value of the append + * argument + * + * @return the size of the claim + * @param content to import from + * @param claim the claim to write imported content to + * @param append if true, the content will be appended to the claim; if + * false, the content will replace the contents of the claim + * @throws IOException if unable to read content + */ + long importFrom(Path content, ContentClaim claim, boolean append) throws IOException; + + /** + * Imports content from the given stream creating a new content object and + * claim within the repository. + * + * @return the size of the claim + * @param content to import from + * @param claim the claim to write imported content to + * @throws IOException if unable to read content + */ + long importFrom(InputStream content, ContentClaim claim) throws IOException; + + /** + * Imports content from the given stream, appending or replacing the current + * claim, according to the value of the appen dargument + * + * @param content to import from + * @param claim to write to + * @param append whether to append or replace + * @return length of data imported in bytes + * @throws IOException if failure to read or write stream + */ + long importFrom(InputStream content, ContentClaim claim, boolean append) throws IOException; + + /** + * Exports the content of the given claim to the given destination. + * + * @return the size of the destination or the claim + * @param claim to export from + * @param destination where to export data + * @param append if true appends to the destination; false overwrites + * @throws IOException if an IO error occurs. The state of the content for + * the given destination is unknown and callers should consider whether they + * should clean up any partially created paths + */ + long exportTo(ContentClaim claim, Path destination, boolean append) throws IOException; + + /** + * Exports the content of the given claim to the given destination. + * + * @return the size of the destination or the claim + * @param claim to export from + * @param destination where to export data + * @param append if true appends to the destination; false overwrites + * @param offset the offset at which the claim should start being copied + * @param length the number of bytes to copy + * @throws IOException if an IO error occurs. The state of the content for + * the given destination is unknown and callers should consider whether they + * should clean up any partially created paths + */ + long exportTo(ContentClaim claim, Path destination, boolean append, long offset, long length) throws IOException; + + /** + * Exports the content of the given claim to the given destination. + * + * @return the size of the claim + * @param claim to export from + * @param destination where to export data + * @throws IOException if an IO error occurs. + */ + long exportTo(ContentClaim claim, OutputStream destination) throws IOException; + + /** + * Exports a subset of the content of the given claim, starting at offset + * and copying length bytes, to the given destination. + * + * @return the number of bytes copied + * @param claim to export from + * @param destination where to export data + * @param offset the offset into the claim at which the copy should begin + * @param length the number of bytes to copy + * @throws IOException if an IO error occurs. + */ + long exportTo(ContentClaim claim, OutputStream destination, long offset, long length) throws IOException; + + /** + * @param claim to get size of + * @return size in bytes of content for given claim + * @throws IOException if size check failed + */ + long size(ContentClaim claim) throws IOException; + + /** + * Provides access to the input stream for the given claim + * + * @param claim to read from + * @return InputStream over the content of the given claim + * @throws IOException if unable to read + */ + InputStream read(ContentClaim claim) throws IOException; + + /** + * Obtains an OutputStream to the content for the given claim. + * + * @param claim to write to + * @return the stream to write to + * @throws IOException if unable to obtain stream + */ + OutputStream write(ContentClaim claim) throws IOException; + + /** + * Purges the contents of the repository, as if the repository were newly + * created. + */ + void purge(); + + /** + * Performs any cleanup actions that may need to be taken upon system + * restart. For example, if content was partially written to the repository + * before the restart, the repository is given a chance to handle this data + */ + void cleanup(); + + /** + * @param contentClaim the Content Claim to check + * @return Returns a boolean indicating whether or not the content specified + * by the given claim can be read, regardless of whether the content has + * been archived or not. If the ContentRepository does not implement + * archiving capabilities, this method will return <code>false</code> + * + * @throws IOException if unable to determine accessibility + */ + boolean isAccessible(ContentClaim contentClaim) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRecord.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRecord.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRecord.java new file mode 100644 index 0000000..f2493f6 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRecord.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import org.apache.nifi.controller.repository.claim.ContentClaim; +import org.apache.nifi.flowfile.FlowFile; + +/** + * <code>FlowFileRecord</code> is a sub-interface of <code>FlowFile</code> and + * is used to provide additional information about FlowFiles that provide + * valuable information to the framework but should be hidden from components + */ +public interface FlowFileRecord extends FlowFile { + + /** + * @return the time (in millis since epoch) at which this FlowFile should no + * longer be penalized + */ + long getPenaltyExpirationMillis(); + + /** + * @return the {@link ContentClaim} that holds the FlowFile's content + */ + ContentClaim getContentClaim(); + + /** + * @return the byte offset into the {@link ContentClaim} at which the + * FlowFile's content occurs. This mechanism allows multiple FlowFiles to + * have the same ContentClaim, which can be significantly more efficient for + * some implementations of + * {@link nifi.controller.repository.ContentRepository ContentRepository} + */ + long getContentClaimOffset(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java new file mode 100644 index 0000000..5e59e04 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileRepository.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.nifi.controller.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaimManager; + +/** + * Implementations must be thread safe + * + */ +public interface FlowFileRepository extends Closeable { + + /** + * Initializes the Content Repository, providing to it the + * ContentClaimManager that is to be used for interacting with Content + * Claims + * + * @param claimManager for handling claims + * @throws java.io.IOException if unable to initialize repository + */ + void initialize(ContentClaimManager claimManager) throws IOException; + + /** + * @return the maximum number of bytes that can be stored in the underlying + * storage mechanism + * + * @throws IOException if computing capacity fails + */ + long getStorageCapacity() throws IOException; + + /** + * @return the number of bytes currently available for use by the underlying + * storage mechanism + * + * @throws IOException if computing usable space fails + */ + long getUsableStorageSpace() throws IOException; + + /** + * Updates the repository with the given RepositoryRecords. + * + * @param records the records to update the repository with + * @throws java.io.IOException if update fails + */ + void updateRepository(Collection<RepositoryRecord> records) throws IOException; + + /** + * Loads all flow files found within the repository, establishes the content + * claims and their reference count + * + * @param queueProvider the provider of FlowFile Queues into which the + * FlowFiles should be enqueued + * @param minimumSequenceNumber specifies the minimum value that should be + * returned by a call to {@link #getNextFlowFileSequence()} + * + * @return index of highest flow file identifier + * @throws IOException if load fails + */ + long loadFlowFiles(QueueProvider queueProvider, long minimumSequenceNumber) throws IOException; + + /** + * @return <code>true</code> if the Repository is volatile (i.e., its data + * is lost upon application restart), <code>false</code> otherwise + */ + boolean isVolatile(); + + /** + * @return the next ID in sequence for creating <code>FlowFile</code>s. + */ + long getNextFlowFileSequence(); + + /** + * @return the max ID of all <code>FlowFile</code>s that currently exist in + * the repository. + * @throws IOException if computing max identifier fails + */ + long getMaxFlowFileIdentifier() throws IOException; + + /** + * Updates the Repository to indicate that the given FlowFileRecords were + * Swapped Out of memory + * + * @param swappedOut the FlowFiles that were swapped out of memory + * @param flowFileQueue the queue that the FlowFiles belong to + * @param swapLocation the location to which the FlowFiles were swapped + * + * @throws IOException if swap fails + */ + void swapFlowFilesOut(List<FlowFileRecord> swappedOut, FlowFileQueue flowFileQueue, String swapLocation) throws IOException; + + /** + * Updates the Repository to indicate that the given FlowFileRecpords were + * Swapped In to memory + * + * @param swapLocation the location (e.g., a filename) from which FlowFiles + * were recovered + * @param flowFileRecords the records that were swapped in + * @param flowFileQueue the queue that the FlowFiles belong to + * + * @throws IOException if swap fails + */ + void swapFlowFilesIn(String swapLocation, List<FlowFileRecord> flowFileRecords, FlowFileQueue flowFileQueue) throws IOException; +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java new file mode 100644 index 0000000..869e2b3 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/FlowFileSwapManager.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import org.apache.nifi.controller.repository.claim.ContentClaimManager; +import org.apache.nifi.events.EventReporter; + +/** + * Defines a mechanism by which FlowFiles can be move into external storage or + * memory so that they can be removed from the Java heap and vice-versa + */ +public interface FlowFileSwapManager { + + /** + * Starts the Manager's background threads to start swapping FlowFiles in + * and out of memory + * + * @param flowFileRepository the FlowFileRepository that must be notified of + * any swapping in or out of FlowFiles + * @param queueProvider the provider of FlowFileQueue's so that FlowFiles + * can be obtained and restored + * @param claimManager the ContentClaimManager to use for interacting with + * Content Claims + * @param reporter the EventReporter that can be used for notifying users of + * important events + */ + void start(FlowFileRepository flowFileRepository, QueueProvider queueProvider, ContentClaimManager claimManager, EventReporter reporter); + + /** + * Shuts down the manager + */ + void shutdown(); + + /** + * Removes all Swap information, permanently destroying any FlowFiles that + * have been swapped out + */ + void purge(); + + /** + * Notifies FlowFile queues of the number of FlowFiles and content size of + * all FlowFiles that are currently swapped out + * + * @param connectionProvider provider + * @param claimManager manager + * @return how many flowfiles have been recovered + */ + long recoverSwappedFlowFiles(QueueProvider connectionProvider, ContentClaimManager claimManager); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java new file mode 100644 index 0000000..fcb516d --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/QueueProvider.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import java.util.Collection; + +import org.apache.nifi.controller.FlowFileQueue; + +/** + * Provides a collection of <code>FlowFileQueue</code>s that represents all + * queues in the current flow + */ +public interface QueueProvider { + + /** + * @return all <code>FlowFileQueue</code>s that currently exist in the flow + */ + Collection<FlowFileQueue> getAllQueues(); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java new file mode 100644 index 0000000..40d44a8 --- /dev/null +++ b/nifi-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecord.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.repository; + +import org.apache.nifi.controller.FlowFileQueue; +import org.apache.nifi.controller.repository.claim.ContentClaim; + +/** + * Represents an abstraction of a FlowFile that can be used to track changing + * state of a FlowFile so that transactionality with repositories can exist + */ +public interface RepositoryRecord { + + /** + * @return FlowFileQueue to which the FlowFile is to be transferred + */ + FlowFileQueue getDestination(); + + /** + * @return FlowFileQueue from which the record was pulled + */ + FlowFileQueue getOriginalQueue(); + + /** + * @return type of update that this record encapsulates + */ + RepositoryRecordType getType(); + + /** + * @return current ContentClaim for the FlowFile + */ + ContentClaim getCurrentClaim(); + + /** + * @return original ContentClaim for the FlowFile before any changes were made + */ + ContentClaim getOriginalClaim(); + + /** + * @return byte offset into the Content Claim where this FlowFile's content + * begins + */ + long getCurrentClaimOffset(); + + /** + * @return FlowFile being encapsulated by this record + */ + FlowFileRecord getCurrent(); + + /** + * @return Whether or not the FlowFile's attributes have changed since the FlowFile + * was pulled from its queue (or created) + */ + boolean isAttributesChanged(); + + /** + * @return <code>true</code> if the FlowFile is not viable and should be + * aborted (e.g., the content of the FlowFile cannot be found) + */ + boolean isMarkedForAbort(); + + /** + * @return If the FlowFile is swapped out of the Java heap space, provides the + * location of the swap file, or <code>null</code> if the FlowFile is not + * swapped out + */ + String getSwapLocation(); +}