nifi-992 Improvements based on code review. - Removed checkstyle and contrib-check profile since it's inherit from top-level pom. - Consolidate DOC_ID and DOC_ID_EXP into a single DOC_ID property. - Add capability description on GetCouchbaseKey. - Fixed documentation spell misses. - Handle Exceptions accordingly. - Add 'retry' relationship.
Signed-off-by: Bryan Bende <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/72eb64e8 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/72eb64e8 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/72eb64e8 Branch: refs/heads/NIFI-810-InputRequirement Commit: 72eb64e8a43a08a5be988a3826b0a116c57915ea Parents: 2466a24 Author: ijokarumawak <[email protected]> Authored: Wed Sep 30 00:58:39 2015 +0900 Committer: Bryan Bende <[email protected]> Committed: Tue Sep 29 13:48:13 2015 -0400 ---------------------------------------------------------------------- .../nifi-couchbase-processors/pom.xml | 144 ---------- .../nifi/couchbase/CouchbaseAttributes.java | 4 + .../couchbase/AbstractCouchbaseProcessor.java | 94 ++++--- .../couchbase/CouchbaseExceptionMappings.java | 128 +++++++++ .../couchbase/ErrorHandlingStrategy.java | 59 ++++ .../processors/couchbase/GetCouchbaseKey.java | 45 ++- .../processors/couchbase/PutCouchbaseKey.java | 45 +-- .../couchbase/TestCouchbaseClusterService.java | 2 +- .../couchbase/TestGetCouchbaseKey.java | 282 +++++++++++++++++-- .../couchbase/TestPutCouchbaseKey.java | 95 +++++-- 10 files changed, 626 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml index 33b0baa..257ef46 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/pom.xml @@ -61,148 +61,4 @@ <scope>test</scope> </dependency> </dependencies> - <build> - <pluginManagement> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <version>2.15</version> - <dependencies> - <dependency> - <groupId>com.puppycrawl.tools</groupId> - <artifactId>checkstyle</artifactId> - <version>6.5</version> - </dependency> - </dependencies> - </plugin> - </plugins> - </pluginManagement> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <configuration> - <checkstyleRules> - <module name="Checker"> - <property name="charset" value="UTF-8" /> - <property name="severity" value="warning" /> - <!-- Checks for whitespace --> - <!-- See http://checkstyle.sf.net/config_whitespace.html --> - <module name="FileTabCharacter"> - <property name="eachLine" value="true" /> - </module> - <module name="TreeWalker"> - <module name="RegexpSinglelineJava"> - <property name="format" value="\s+$" /> - <property name="message" value="Line has trailing whitespace." /> - </module> - <module name="RegexpSinglelineJava"> - <property name="format" value="[@]see\s+[{][@]link" /> - <property name="message" value="Javadoc @see does not need @link: pick one or the other." /> - </module> - <module name="OuterTypeFilename" /> - <module name="LineLength"> - <!-- needs extra, because Eclipse formatter ignores the ending left - brace --> - <property name="max" value="200" /> - <property name="ignorePattern" value="^package.*|^import.*|a href|href|http://|https://|ftp://" /> - </module> - <module name="AvoidStarImport" /> - <module name="UnusedImports"> - <property name="processJavadoc" value="true" /> - </module> - <module name="NoLineWrap" /> - <module name="LeftCurly"> - <property name="maxLineLength" value="160" /> - </module> - <module name="RightCurly" /> - <module name="RightCurly"> - <property name="option" value="alone" /> - <property name="tokens" value="CLASS_DEF, METHOD_DEF, CTOR_DEF, LITERAL_FOR, LITERAL_WHILE, LITERAL_DO, STATIC_INIT, INSTANCE_INIT" /> - </module> - <module name="SeparatorWrap"> - <property name="tokens" value="DOT" /> - <property name="option" value="nl" /> - </module> - <module name="SeparatorWrap"> - <property name="tokens" value="COMMA" /> - <property name="option" value="EOL" /> - </module> - <module name="PackageName"> - <property name="format" value="^[a-z]+(\.[a-z][a-zA-Z0-9]*)*$" /> - </module> - <module name="MethodTypeParameterName"> - <property name="format" value="(^[A-Z][0-9]?)$|([A-Z][a-zA-Z0-9]*[T]$)" /> - </module> - <module name="MethodParamPad" /> - <module name="OperatorWrap"> - <property name="option" value="NL" /> - <property name="tokens" value="BAND, BOR, BSR, BXOR, DIV, EQUAL, GE, GT, LAND, LE, LITERAL_INSTANCEOF, LOR, LT, MINUS, MOD, NOT_EQUAL, QUESTION, SL, SR, STAR " /> - </module> - <module name="AnnotationLocation"> - <property name="tokens" value="CLASS_DEF, INTERFACE_DEF, ENUM_DEF, METHOD_DEF, CTOR_DEF" /> - </module> - <module name="AnnotationLocation"> - <property name="tokens" value="VARIABLE_DEF" /> - <property name="allowSamelineMultipleAnnotations" value="true" /> - </module> - <module name="NonEmptyAtclauseDescription" /> - <module name="JavadocMethod"> - <property name="allowMissingJavadoc" value="true" /> - <property name="allowMissingParamTags" value="true" /> - <property name="allowMissingThrowsTags" value="true" /> - <property name="allowMissingReturnTag" value="true" /> - <property name="allowedAnnotations" value="Override,Test,BeforeClass,AfterClass,Before,After" /> - <property name="allowThrowsTagsForSubclasses" value="true" /> - </module> - <module name="SingleLineJavadoc" /> - </module> - </module> - </checkstyleRules> - <violationSeverity>warning</violationSeverity> - <includeTestSourceDirectory>true</includeTestSourceDirectory> - </configuration> - </plugin> - </plugins> - </build> - <profiles> - <profile> - <!-- Checks style and licensing requirements. This is a good idea to run - for contributions and for the release process. While it would be nice to - run always these plugins can considerably slow the build and have proven - to create unstable builds in our multi-module project and when building using - multiple threads. The stability issues seen with Checkstyle in multi-module - builds include false-positives and false negatives. --> - <id>contrib-check</id> - <build> - <plugins> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>check</goal> - </goals> - <phase>verify</phase> - </execution> - </executions> - </plugin> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-checkstyle-plugin</artifactId> - <executions> - <execution> - <id>check-style</id> - <goals> - <goal>check</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - </profile> - </profiles> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java index a4d69fc..3bef8c5 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/couchbase/CouchbaseAttributes.java @@ -43,6 +43,10 @@ public enum CouchbaseAttributes implements FlowFileAttributeKey { * The expiration of a related document. */ Expiry("couchbase.doc.expiry"), + /** + * The thrown CouchbaseException class. + */ + Exception("couchbase.exception"), ; private final String key; http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java index d370728..066b1ca 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/AbstractCouchbaseProcessor.java @@ -23,13 +23,19 @@ import java.util.List; import java.util.Set; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.couchbase.CouchbaseAttributes; import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessorInitializationContext; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; +import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.java.Bucket; /** @@ -46,49 +52,45 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { .build(); public static final PropertyDescriptor DOC_ID = new PropertyDescriptor - .Builder().name("Static Document Id") - .description("A static, fixed Couchbase document id.") - .required(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - - public static final PropertyDescriptor DOC_ID_EXP = new PropertyDescriptor - .Builder().name("Document Id Expression") - .description("An expression to construct the Couchbase document id." - + " If 'Static Document Id' is specified, then 'Static Document Id' is used.") - .required(false) + .Builder().name("Document Id") + .description("A static, fixed Couchbase document id." + + "Or an expression to construct the Couchbase document id.") .expressionLanguageSupported(true) - .addValidator(StandardValidators.ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.") - .build(); + .name("success") + .description("All FlowFiles that are written to Couchbase Server are routed to this relationship.") + .build(); public static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("The original input file will be routed to this destination when it has been successfully processed.") - .build(); + .name("original") + .description("The original input file will be routed to this destination when it has been successfully processed.") + .build(); + public static final Relationship REL_RETRY = new Relationship.Builder() + .name("retry") + .description("All FlowFiles that cannot written to Couchbase Server but can be retried are routed to this relationship.") + .build(); public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("All FlowFiles that cannot written to Couchbase Server are routed to this relationship.") - .build(); + .name("failure") + .description("All FlowFiles that cannot written to Couchbase Server and can't be retried are routed to this relationship.") + .build(); public static final PropertyDescriptor COUCHBASE_CLUSTER_SERVICE = new PropertyDescriptor - .Builder().name("Couchbase Cluster Controller Service") - .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") - .required(true) - .identifiesControllerService(CouchbaseClusterControllerService.class) - .build(); + .Builder().name("Couchbase Cluster Controller Service") + .description("A Couchbase Cluster Controller Service which manages connections to a Couchbase cluster.") + .required(true) + .identifiesControllerService(CouchbaseClusterControllerService.class) + .build(); public static final PropertyDescriptor BUCKET_NAME = new PropertyDescriptor - .Builder().name("Bucket Name") - .description("The name of bucket to access.") - .required(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .defaultValue("default") - .build(); + .Builder().name("Bucket Name") + .description("The name of bucket to access.") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .defaultValue("default") + .build(); private List<PropertyDescriptor> descriptors; @@ -171,4 +173,32 @@ public abstract class AbstractCouchbaseProcessor extends AbstractProcessor { .toString(); } + /** + * Handles the thrown CocuhbaseException accordingly. + * @param session a process session + * @param logger a logger + * @param inFile an input FlowFile + * @param e the thrown CouchbaseException + * @param errMsg a message to be logged + */ + protected void handleCouchbaseException(final ProcessSession session, + final ProcessorLog logger, FlowFile inFile, CouchbaseException e, + String errMsg) { + logger.error(errMsg, e); + if(inFile != null){ + ErrorHandlingStrategy strategy = CouchbaseExceptionMappings.getStrategy(e); + switch(strategy.result()) { + case ProcessException: + throw new ProcessException(errMsg, e); + case Failure: + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); + session.transfer(inFile, REL_FAILURE); + break; + case Retry: + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), e.getClass().getName()); + session.transfer(inFile, REL_RETRY); + break; + } + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java new file mode 100644 index 0000000..87ffabb --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/CouchbaseExceptionMappings.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.ConfigurationError; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Fatal; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.InvalidInput; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.TemporalClusterError; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.TemporalFlowFileError; + +import java.util.HashMap; +import java.util.Map; + +import com.couchbase.client.core.BackpressureException; +import com.couchbase.client.core.BucketClosedException; +import com.couchbase.client.core.CouchbaseException; +import com.couchbase.client.core.DocumentConcurrentlyModifiedException; +import com.couchbase.client.core.DocumentMutationLostException; +import com.couchbase.client.core.ReplicaNotConfiguredException; +import com.couchbase.client.core.RequestCancelledException; +import com.couchbase.client.core.ServiceNotAvailableException; +import com.couchbase.client.core.config.ConfigurationException; +import com.couchbase.client.core.endpoint.SSLException; +import com.couchbase.client.core.endpoint.kv.AuthenticationException; +import com.couchbase.client.core.env.EnvironmentException; +import com.couchbase.client.core.state.NotConnectedException; +import com.couchbase.client.java.error.BucketDoesNotExistException; +import com.couchbase.client.java.error.CannotRetryException; +import com.couchbase.client.java.error.CouchbaseOutOfMemoryException; +import com.couchbase.client.java.error.DurabilityException; +import com.couchbase.client.java.error.InvalidPasswordException; +import com.couchbase.client.java.error.RequestTooBigException; +import com.couchbase.client.java.error.TemporaryFailureException; +import com.couchbase.client.java.error.TranscodingException; + +public class CouchbaseExceptionMappings { + + private static final Map<Class<? extends CouchbaseException>, ErrorHandlingStrategy>mapping = new HashMap<>(); + + /* + * - Won't happen + * BucketAlreadyExistsException: never create a bucket + * CASMismatchException: cas-id and replace is not used yet + * DesignDocumentException: View is not used yet + * DocumentAlreadyExistsException: insert is not used yet + * DocumentDoesNotExistException: replace is not used yet + * FlushDisabledException: never call flush + * RepositoryMappingException: EntityDocument is not used + * TemporaryLockFailureException: we don't obtain locks + * ViewDoesNotExistException: View is not used yet + * NamedPreparedStatementException: N1QL is not used yet + * QueryExecutionException: N1QL is not used yet + */ + static { + /* + * ConfigurationError + */ + mapping.put(AuthenticationException.class, ConfigurationError); + mapping.put(BucketDoesNotExistException.class, ConfigurationError); + mapping.put(ConfigurationException.class, ConfigurationError); + mapping.put(InvalidPasswordException.class, ConfigurationError); + mapping.put(EnvironmentException.class, ConfigurationError); + // when Couchbase doesn't have enough replica + mapping.put(ReplicaNotConfiguredException.class, ConfigurationError); + // when a particular Service(KV, View, Query, DCP) isn't running in a cluster + mapping.put(ServiceNotAvailableException.class, ConfigurationError); + // SSL configuration error, such as key store mis configuration. + mapping.put(SSLException.class, ConfigurationError); + + /* + * InvalidInput + */ + mapping.put(RequestTooBigException.class, InvalidInput); + mapping.put(TranscodingException.class, InvalidInput); + + /* + * Temporal Cluster Error + */ + mapping.put(BackpressureException.class, TemporalClusterError); + mapping.put(CouchbaseOutOfMemoryException.class, TemporalClusterError); + mapping.put(TemporaryFailureException.class, TemporalClusterError); + // occurs when a connection gets lost + mapping.put(RequestCancelledException.class, TemporalClusterError); + + /* + * Temporal FlowFile Error + */ + mapping.put(DocumentConcurrentlyModifiedException.class, TemporalFlowFileError); + mapping.put(DocumentMutationLostException.class, TemporalFlowFileError); + mapping.put(DurabilityException.class, TemporalFlowFileError); + + /* + * Fatal + */ + mapping.put(BucketClosedException.class, Fatal); + mapping.put(CannotRetryException.class, Fatal); + mapping.put(NotConnectedException.class, Fatal); + } + + /** + * Returns a registered error handling strategy. + * @param e the CouchbaseException + * @return a registered strategy, if it's not registered, then return Fatal + */ + public static ErrorHandlingStrategy getStrategy(CouchbaseException e){ + ErrorHandlingStrategy strategy = mapping.get(e.getClass()); + if(strategy == null) { + // Treat unknown Exception as Fatal. + return ErrorHandlingStrategy.Fatal; + } + return strategy; + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java new file mode 100644 index 0000000..75b8f46 --- /dev/null +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/ErrorHandlingStrategy.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.couchbase; + +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Penalize; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Penalty.Yield; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Failure; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.ProcessException; +import static org.apache.nifi.processors.couchbase.ErrorHandlingStrategy.Result.Retry; + + +public enum ErrorHandlingStrategy { + + ConfigurationError(ProcessException, Yield), + InvalidInput(Failure, Penalize), + TemporalClusterError(Retry, Yield), + TemporalFlowFileError(Retry, Penalize), + Fatal(Failure, Yield); + + private final Result result; + private final Penalty penalty; + private ErrorHandlingStrategy(Result result, Penalty penalty){ + this.result = result; + this.penalty = penalty; + } + + public enum Result { + ProcessException, Failure, Retry; + } + + /** + * Indicating yield or penalize the processing when transfer the input FlowFile. + */ + public enum Penalty { + Yield, Penalize; + } + + public Result result(){ + return this.result; + } + + public Penalty penalty(){ + return this.penalty; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java index 6d9a476..8c15e29 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/GetCouchbaseKey.java @@ -45,24 +45,27 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.stream.io.StreamUtils; +import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.document.BinaryDocument; import com.couchbase.client.java.document.Document; import com.couchbase.client.java.document.RawJsonDocument; +import com.couchbase.client.java.error.DocumentDoesNotExistException; @Tags({ "nosql", "couchbase", "database", "get" }) -@CapabilityDescription("Get a document from Couchbase Server via Key/Value access.") +@CapabilityDescription("Get a document from Couchbase Server via Key/Value access. This processor can be triggered by an incoming FlowFile, or it can be scheduled on a timer") @SeeAlso({CouchbaseClusterControllerService.class}) @ReadsAttributes({ - @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"), - @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.") + @ReadsAttribute(attribute = "FlowFile content", description = "Used as a document id if 'Document Id' is not specified"), + @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.") }) @WritesAttributes({ @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was retrieved from."), @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was retrieved from."), @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), - @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.") + @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."), + @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.") }) public class GetCouchbaseKey extends AbstractCouchbaseProcessor { @@ -70,13 +73,13 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { protected void addSupportedProperties(List<PropertyDescriptor> descriptors) { descriptors.add(DOCUMENT_TYPE); descriptors.add(DOC_ID); - descriptors.add(DOC_ID_EXP); } @Override protected void addSupportedRelationships(Set<Relationship> relationships) { relationships.add(REL_SUCCESS); relationships.add(REL_ORIGINAL); + relationships.add(REL_RETRY); relationships.add(REL_FAILURE); } @@ -86,15 +89,9 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { FlowFile inFile = session.get(); String docId = null; - if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ - docId = context.getProperty(DOC_ID).getValue(); - } else { - // Otherwise docId has to be extracted from inFile. - if ( inFile == null ) { - return; - } - if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){ - docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(inFile).getValue(); + try { + if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ + docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(inFile).getValue(); } else { final byte[] content = new byte[(int) inFile.getSize()]; session.read(inFile, new InputStreamCallback() { @@ -105,11 +102,14 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { }); docId = new String(content, StandardCharsets.UTF_8); } + } catch (Throwable t) { + throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile); } if(StringUtils.isEmpty(docId)){ - logger.error("Couldn't get document id from from {}", new Object[]{inFile}); - session.transfer(inFile, REL_FAILURE); + if(inFile != null){ + throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + inFile); + } } try { @@ -137,8 +137,9 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { } if(doc == null) { - logger.info("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)}); + logger.warn("Document {} was not found in {}", new Object[]{docId, getTransitUrl(context)}); if(inFile != null){ + inFile = session.putAttribute(inFile, CouchbaseAttributes.Exception.key(), DocumentDoesNotExistException.class.getName()); session.transfer(inFile, REL_FAILURE); } return; @@ -160,13 +161,11 @@ public class GetCouchbaseKey extends AbstractCouchbaseProcessor { session.getProvenanceReporter().receive(outFile, getTransitUrl(context)); session.transfer(outFile, REL_SUCCESS); - } catch (Throwable t){ - logger.error("Getting docuement {} from Couchbase Server using {} failed due to {}", - new Object[]{docId, inFile, t}, t); - if(inFile != null){ - session.transfer(inFile, REL_FAILURE); - } + } catch (CouchbaseException e){ + String errMsg = String.format("Getting docuement %s from Couchbase Server using %s failed due to %s", docId, inFile, e); + handleCouchbaseException(session, logger, inFile, e, errMsg); } } + } http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java index 6bfa480..8f41383 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/main/java/org/apache/nifi/processors/couchbase/PutCouchbaseKey.java @@ -45,6 +45,7 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.InputStreamCallback; import org.apache.nifi.stream.io.StreamUtils; +import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.deps.io.netty.buffer.ByteBuf; import com.couchbase.client.deps.io.netty.buffer.Unpooled; import com.couchbase.client.java.PersistTo; @@ -57,15 +58,16 @@ import com.couchbase.client.java.document.RawJsonDocument; @CapabilityDescription("Put a document to Couchbase Server via Key/Value access.") @SeeAlso({CouchbaseClusterControllerService.class}) @ReadsAttributes({ - @ReadsAttribute(attribute = "uuid", description = "Used as a document id if none of 'Static Document Id' or 'Document Id Expression' is specified"), - @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id Excepression.") + @ReadsAttribute(attribute = "uuid", description = "Used as a document id if 'Document Id' is not specified"), + @ReadsAttribute(attribute = "*", description = "Any attribute can be used as part of a document id by 'Document Id' expression.") }) @WritesAttributes({ @WritesAttribute(attribute="couchbase.cluster", description="Cluster where the document was stored."), @WritesAttribute(attribute="couchbase.bucket", description="Bucket where the document was stored."), @WritesAttribute(attribute="couchbase.doc.id", description="Id of the document."), @WritesAttribute(attribute="couchbase.doc.cas", description="CAS of the document."), - @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document.") + @WritesAttribute(attribute="couchbase.doc.expiry", description="Expiration of the document."), + @WritesAttribute(attribute="couchbase.exception", description="If Couchbase related error occurs the CouchbaseException class name will be captured here.") }) public class PutCouchbaseKey extends AbstractCouchbaseProcessor { @@ -90,7 +92,6 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { protected void addSupportedProperties(List<PropertyDescriptor> descriptors) { descriptors.add(DOCUMENT_TYPE); descriptors.add(DOC_ID); - descriptors.add(DOC_ID_EXP); descriptors.add(PERSIST_TO); descriptors.add(REPLICATE_TO); } @@ -109,24 +110,25 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { return; } - try { - - final byte[] content = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, content, true); - } - }); - + String docId = null; + final byte[] content = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, content, true); + } + }); - String docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); + try { + docId = String.valueOf(flowFile.getAttribute(CoreAttributes.UUID.key())); if(!StringUtils.isEmpty(context.getProperty(DOC_ID).getValue())){ - docId = context.getProperty(DOC_ID).getValue(); - } else if(!StringUtils.isEmpty(context.getProperty(DOC_ID_EXP).getValue())){ - docId = context.getProperty(DOC_ID_EXP).evaluateAttributeExpressions(flowFile).getValue(); + docId = context.getProperty(DOC_ID).evaluateAttributeExpressions(flowFile).getValue(); } + } catch (Throwable t) { + throw new ProcessException("Please check 'Document Id' setting. Couldn't get document id from " + flowFile); + } + try { Document<?> doc = null; DocumentType documentType = DocumentType.valueOf(context.getProperty(DOCUMENT_TYPE).getValue()); switch (documentType){ @@ -141,7 +143,6 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { } } - PersistTo persistTo = PersistTo.valueOf(context.getProperty(PERSIST_TO).getValue()); ReplicateTo replicateTo = ReplicateTo.valueOf(context.getProperty(REPLICATE_TO).getValue()); doc = openBucket(context).upsert(doc, persistTo, replicateTo); @@ -155,9 +156,9 @@ public class PutCouchbaseKey extends AbstractCouchbaseProcessor { session.getProvenanceReporter().send(flowFile, getTransitUrl(context)); session.transfer(flowFile, REL_SUCCESS); - } catch (Throwable t) { - logger.error("Writing {} into Couchbase Server failed due to {}", new Object[]{flowFile, t}, t); - session.transfer(flowFile, REL_FAILURE); + } catch (CouchbaseException e) { + String errMsg = String.format("Writing docuement %s to Couchbase Server using %s failed due to %s", docId, flowFile, e); + handleCouchbaseException(session, logger, flowFile, e, errMsg); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java index d96b1c2..eb2220d 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestCouchbaseClusterService.java @@ -45,7 +45,7 @@ public class TestCouchbaseClusterService { @Test public void testConnectionFailure() throws InitializationException { - String connectionString = "couchbase://invalid-hostname"; + String connectionString = "invalid-protocol://invalid-hostname"; CouchbaseClusterControllerService service = new CouchbaseClusterService(); testRunner.addControllerService(SERVICE_ID, service); testRunner.setProperty(service, CouchbaseClusterService.CONNECTION_STRING, connectionString); http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java index 4ea4dff..dca2ae3 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestGetCouchbaseKey.java @@ -16,14 +16,16 @@ */ package org.apache.nifi.processors.couchbase; +import static org.apache.nifi.couchbase.CouchbaseAttributes.Exception; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOCUMENT_TYPE; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; -import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_ORIGINAL; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; +import static org.junit.Assert.fail; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -34,19 +36,28 @@ import java.util.Map; import org.apache.nifi.couchbase.CouchbaseAttributes; import org.apache.nifi.couchbase.CouchbaseClusterControllerService; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.couchbase.client.core.BackpressureException; +import com.couchbase.client.core.CouchbaseException; import com.couchbase.client.core.ServiceNotAvailableException; +import com.couchbase.client.core.endpoint.kv.AuthenticationException; +import com.couchbase.client.core.state.NotConnectedException; import com.couchbase.client.deps.io.netty.buffer.ByteBuf; import com.couchbase.client.deps.io.netty.buffer.Unpooled; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.document.BinaryDocument; import com.couchbase.client.java.document.RawJsonDocument; +import com.couchbase.client.java.error.DocumentDoesNotExistException; +import com.couchbase.client.java.error.DurabilityException; +import com.couchbase.client.java.error.RequestTooBigException; public class TestGetCouchbaseKey { @@ -92,6 +103,7 @@ public class TestGetCouchbaseKey { testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); outFile.assertContentEquals(content); @@ -104,57 +116,110 @@ public class TestGetCouchbaseKey { } - /** - * Use static document id even if doc id expression is set. - */ @Test - public void testStaticDocIdAndDocIdExp() throws Exception { - String docId = "doc-a"; - String docIdExp = "${someProperty}"; + public void testDocIdExp() throws Exception { + String docIdExp = "${'someProperty'}"; + String somePropertyValue = "doc-p"; Bucket bucket = mock(Bucket.class); String content = "{\"key\":\"value\"}"; - when(bucket.get(docId, RawJsonDocument.class)).thenReturn(RawJsonDocument.create(docId, content)); + when(bucket.get(somePropertyValue, RawJsonDocument.class)) + .thenReturn(RawJsonDocument.create(somePropertyValue, content)); setupMockBucket(bucket); - testRunner.setProperty(DOC_ID, docId); - testRunner.setProperty(DOC_ID_EXP, docIdExp); + testRunner.setProperty(DOC_ID, docIdExp); + + byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8); + Map<String, String> properties = new HashMap<>(); + properties.put("someProperty", somePropertyValue); + testRunner.enqueue(inFileData, properties); testRunner.run(); - testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); outFile.assertContentEquals(content); } @Test - public void testDocIdExp() throws Exception { - String docIdExp = "${'someProperty'}"; - String somePropertyValue = "doc-p"; + public void testDocIdExpWithNullFlowFile() throws Exception { + String docIdExp = "doc-s"; + String docId = "doc-s"; Bucket bucket = mock(Bucket.class); String content = "{\"key\":\"value\"}"; - when(bucket.get(somePropertyValue, RawJsonDocument.class)) - .thenReturn(RawJsonDocument.create(somePropertyValue, content)); + when(bucket.get(docId, RawJsonDocument.class)) + .thenReturn(RawJsonDocument.create(docId, content)); setupMockBucket(bucket); - testRunner.setProperty(DOC_ID_EXP, docIdExp); + testRunner.setProperty(DOC_ID, docIdExp); - byte[] inFileData = "input FlowFile data".getBytes(StandardCharsets.UTF_8); - Map<String, String> properties = new HashMap<>(); - properties.put("someProperty", somePropertyValue); - testRunner.enqueue(inFileData, properties); testRunner.run(); testRunner.assertTransferCount(REL_SUCCESS, 1); - testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); outFile.assertContentEquals(content); } @Test + public void testDocIdExpWithInvalidExpression() throws Exception { + String docIdExp = "${nonExistingFunction('doc-s')}"; + String docId = "doc-s"; + + Bucket bucket = mock(Bucket.class); + String content = "{\"key\":\"value\"}"; + when(bucket.get(docId, RawJsonDocument.class)) + .thenReturn(RawJsonDocument.create(docId, content)); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + try { + testRunner.run(); + fail("ProcessException should be throws."); + } catch (AssertionError e){ + Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); + } + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testDocIdExpWithInvalidExpressionOnFlowFile() throws Exception { + String docIdExp = "${nonExistingFunction(someProperty)}"; + + Bucket bucket = mock(Bucket.class); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + Map<String, String> properties = new HashMap<>(); + properties.put("someProperty", "someValue"); + testRunner.enqueue(inFileData, properties); + try { + testRunner.run(); + fail("ProcessException should be throws."); + } catch (AssertionError e){ + Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); + } + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test public void testInputFlowFileContent() throws Exception { Bucket bucket = mock(Bucket.class); @@ -171,9 +236,12 @@ public class TestGetCouchbaseKey { testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); outFile.assertContentEquals(content); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_ORIGINAL).get(0); + orgFile.assertContentEquals(inFileDataStr); } @Test @@ -195,9 +263,12 @@ public class TestGetCouchbaseKey { testRunner.assertTransferCount(REL_SUCCESS, 1); testRunner.assertTransferCount(REL_ORIGINAL, 1); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); outFile.assertContentEquals(content); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_ORIGINAL).get(0); + orgFile.assertContentEquals(inFileDataStr); } @@ -213,12 +284,175 @@ public class TestGetCouchbaseKey { byte[] inFileData = inFileDataStr.getBytes(StandardCharsets.UTF_8); testRunner.enqueue(inFileData); + try { + testRunner.run(); + fail("ProcessException should be throws."); + } catch (AssertionError e){ + Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); + } + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testCouchbaseConfigurationError() throws Exception { + String docIdExp = "doc-c"; + + Bucket bucket = mock(Bucket.class); + when(bucket.get(docIdExp, RawJsonDocument.class)) + .thenThrow(new AuthenticationException()); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + try { + testRunner.run(); + fail("ProcessException should be throws."); + } catch (AssertionError e){ + Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); + Assert.assertTrue(e.getCause().getCause().getClass().equals(AuthenticationException.class)); + } + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testCouchbaseInvalidInputError() throws Exception { + String docIdExp = "doc-c"; + + Bucket bucket = mock(Bucket.class); + CouchbaseException exception = new RequestTooBigException(); + when(bucket.get(docIdExp, RawJsonDocument.class)) + .thenThrow(exception); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + orgFile.assertContentEquals(inputFileDataStr); + orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); + } + + @Test + public void testCouchbaseTempClusterError() throws Exception { + String docIdExp = "doc-c"; + + Bucket bucket = mock(Bucket.class); + CouchbaseException exception = new BackpressureException(); + when(bucket.get(docIdExp, RawJsonDocument.class)) + .thenThrow(exception); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0); + orgFile.assertContentEquals(inputFileDataStr); + orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); + } + + + @Test + public void testCouchbaseTempFlowFileError() throws Exception { + String docIdExp = "doc-c"; + + Bucket bucket = mock(Bucket.class); + // There is no suitable CouchbaseException for temp flowfile error, currently. + CouchbaseException exception = new DurabilityException(); + when(bucket.get(docIdExp, RawJsonDocument.class)) + .thenThrow(exception); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0); + orgFile.assertContentEquals(inputFileDataStr); + orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); + } + + @Test + public void testCouchbaseFatalError() throws Exception { + String docIdExp = "doc-c"; + + Bucket bucket = mock(Bucket.class); + CouchbaseException exception = new NotConnectedException(); + when(bucket.get(docIdExp, RawJsonDocument.class)) + .thenThrow(exception); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); + testRunner.run(); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 1); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + orgFile.assertContentEquals(inputFileDataStr); + orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); + } + + @Test + public void testDocumentNotFound() throws Exception { + String docIdExp = "doc-n"; + + Bucket bucket = mock(Bucket.class); + when(bucket.get(docIdExp, RawJsonDocument.class)) + .thenReturn(null); + setupMockBucket(bucket); + + testRunner.setProperty(DOC_ID, docIdExp); + + String inputFileDataStr = "input FlowFile data"; + byte[] inFileData = inputFileDataStr.getBytes(StandardCharsets.UTF_8); + testRunner.enqueue(inFileData); testRunner.run(); testRunner.assertTransferCount(REL_SUCCESS, 0); testRunner.assertTransferCount(REL_ORIGINAL, 0); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 1); - MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); - outFile.assertContentEquals(inFileDataStr); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); + orgFile.assertContentEquals(inputFileDataStr); + orgFile.assertAttributeEquals(Exception.key(), DocumentDoesNotExistException.class.getName()); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/72eb64e8/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java index 3995528..0388e35 100644 --- a/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java +++ b/nifi-nar-bundles/nifi-couchbase-bundle/nifi-couchbase-processors/src/test/java/org/apache/nifi/processors/couchbase/TestPutCouchbaseKey.java @@ -16,13 +16,15 @@ */ package org.apache.nifi.processors.couchbase; +import static org.apache.nifi.couchbase.CouchbaseAttributes.Exception; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.BUCKET_NAME; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.COUCHBASE_CLUSTER_SERVICE; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID; -import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.DOC_ID_EXP; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_FAILURE; +import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_RETRY; import static org.apache.nifi.processors.couchbase.AbstractCouchbaseProcessor.REL_SUCCESS; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.eq; @@ -38,14 +40,18 @@ import java.util.Map; import org.apache.nifi.couchbase.CouchbaseAttributes; import org.apache.nifi.couchbase.CouchbaseClusterControllerService; import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; +import com.couchbase.client.core.CouchbaseException; +import com.couchbase.client.core.ServiceNotAvailableException; import com.couchbase.client.java.Bucket; import com.couchbase.client.java.PersistTo; import com.couchbase.client.java.ReplicateTo; @@ -102,6 +108,7 @@ public class TestPutCouchbaseKey { testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); outFile.assertContentEquals(inFileData); @@ -134,44 +141,44 @@ public class TestPutCouchbaseKey { testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); outFile.assertContentEquals(inFileData); } - /** - * Use static document id even if doc id expression is set. - */ @Test - public void testStaticDocIdAndDocIdExp() throws Exception { - String docId = "doc-a"; - String docIdExp = "${someProperty}"; + public void testDocIdExp() throws Exception { + String docIdExp = "${'someProperty'}"; + String somePropertyValue = "doc-p"; String inFileData = "{\"key\":\"value\"}"; byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); Bucket bucket = mock(Bucket.class); when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE))) - .thenReturn(RawJsonDocument.create(docId, inFileData)); + .thenReturn(RawJsonDocument.create(somePropertyValue, inFileData)); setupMockBucket(bucket); - testRunner.enqueue(inFileDataBytes); - testRunner.setProperty(DOC_ID, docId); - testRunner.setProperty(DOC_ID_EXP, docIdExp); + testRunner.setProperty(DOC_ID, docIdExp); + + Map<String, String> properties = new HashMap<>(); + properties.put("someProperty", somePropertyValue); + testRunner.enqueue(inFileDataBytes, properties); testRunner.run(); verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); - testRunner.assertAllFlowFilesTransferred(REL_SUCCESS); testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); outFile.assertContentEquals(inFileData); } @Test - public void testDocIdExp() throws Exception { - String docIdExp = "${'someProperty'}"; + public void testInvalidDocIdExp() throws Exception { + String docIdExp = "${invalid_function(someProperty)}"; String somePropertyValue = "doc-p"; String inFileData = "{\"key\":\"value\"}"; @@ -182,19 +189,21 @@ public class TestPutCouchbaseKey { .thenReturn(RawJsonDocument.create(somePropertyValue, inFileData)); setupMockBucket(bucket); - testRunner.setProperty(DOC_ID_EXP, docIdExp); + testRunner.setProperty(DOC_ID, docIdExp); Map<String, String> properties = new HashMap<>(); properties.put("someProperty", somePropertyValue); testRunner.enqueue(inFileDataBytes, properties); - testRunner.run(); - - verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.NONE)); + try { + testRunner.run(); + fail("ProcessException should be throws."); + } catch (AssertionError e){ + Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); + } - testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); - MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); - outFile.assertContentEquals(inFileData); } @Test @@ -219,6 +228,7 @@ public class TestPutCouchbaseKey { assertEquals(uuid, capture.getValue().id()); testRunner.assertTransferCount(REL_SUCCESS, 1); + testRunner.assertTransferCount(REL_RETRY, 0); testRunner.assertTransferCount(REL_FAILURE, 0); MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_SUCCESS).get(0); outFile.assertContentEquals(inFileData); @@ -235,20 +245,53 @@ public class TestPutCouchbaseKey { Bucket bucket = mock(Bucket.class); when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE))) - .thenThrow(new DurabilityException()); + .thenThrow(new ServiceNotAvailableException()); setupMockBucket(bucket); testRunner.enqueue(inFileDataBytes); testRunner.setProperty(DOC_ID, docId); testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); - testRunner.run(); + try { + testRunner.run(); + fail("ProcessException should be throws."); + } catch (AssertionError e){ + Assert.assertTrue(e.getCause().getClass().equals(ProcessException.class)); + } verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)); testRunner.assertAllFlowFilesTransferred(REL_FAILURE); testRunner.assertTransferCount(REL_SUCCESS, 0); - testRunner.assertTransferCount(REL_FAILURE, 1); - MockFlowFile outFile = testRunner.getFlowFilesForRelationship(REL_FAILURE).get(0); - outFile.assertContentEquals(inFileData); + testRunner.assertTransferCount(REL_RETRY, 0); + testRunner.assertTransferCount(REL_FAILURE, 0); + } + + @Test + public void testCouchbaseTempFlowFileError() throws Exception { + + String docId = "doc-a"; + + String inFileData = "{\"key\":\"value\"}"; + byte[] inFileDataBytes = inFileData.getBytes(StandardCharsets.UTF_8); + + Bucket bucket = mock(Bucket.class); + CouchbaseException exception = new DurabilityException(); + when(bucket.upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE))) + .thenThrow(exception); + setupMockBucket(bucket); + + testRunner.enqueue(inFileDataBytes); + testRunner.setProperty(DOC_ID, docId); + testRunner.setProperty(PutCouchbaseKey.REPLICATE_TO, ReplicateTo.ONE.toString()); + testRunner.run(); + + verify(bucket, times(1)).upsert(any(RawJsonDocument.class), eq(PersistTo.NONE), eq(ReplicateTo.ONE)); + + testRunner.assertTransferCount(REL_SUCCESS, 0); + testRunner.assertTransferCount(REL_RETRY, 1); + testRunner.assertTransferCount(REL_FAILURE, 0); + MockFlowFile orgFile = testRunner.getFlowFilesForRelationship(REL_RETRY).get(0); + orgFile.assertContentEquals(inFileData); + orgFile.assertAttributeEquals(Exception.key(), exception.getClass().getName()); } }
