NIFI-1811 Renaming MockProcessorLogger to MockComponentLogger for consistency. Removing unused imports from ExecuteScript causing checkstyle failures.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/1bd2cf0d Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/1bd2cf0d Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/1bd2cf0d Branch: refs/heads/master Commit: 1bd2cf0d09a7111bcecffd0f473aa71c25a69845 Parents: 372ffb8 Author: Aldrin Piri <[email protected]> Authored: Thu May 19 13:03:29 2016 -0400 Committer: Aldrin Piri <[email protected]> Committed: Thu May 19 14:38:41 2016 -0400 ---------------------------------------------------------------------- .../init/ControllerServiceInitializer.java | 4 +- .../init/ProcessorInitializer.java | 4 +- .../init/ReportingTaskingInitializer.java | 4 +- .../documentation/mock/MockComponentLogger.java | 258 +++++++++++ ...kControllerServiceInitializationContext.java | 2 +- .../MockProcessorInitializationContext.java | 2 +- .../documentation/mock/MockProcessorLogger.java | 258 ----------- .../MockReportingInitializationContext.java | 2 +- .../apache/nifi/controller/FlowController.java | 6 +- .../StandardProcessorInitializationContext.java | 4 +- .../nifi/processors/hive/SelectHiveQL.java | 30 +- .../processors/kafka/pubsub/KafkaPublisher.java | 8 +- .../AbstractKafkaProcessorLifecycelTest.java | 456 ------------------- .../AbstractKafkaProcessorLifecycleTest.java | 456 +++++++++++++++++++ .../nifi/processors/script/ExecuteScript.java | 2 - .../nifi/processors/standard/TransformJSON.java | 10 +- 16 files changed, 752 insertions(+), 754 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java index 7a6d62c..4d1651e 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ControllerServiceInitializer.java @@ -22,7 +22,7 @@ import org.apache.nifi.controller.ControllerService; import org.apache.nifi.documentation.ConfigurableComponentInitializer; import org.apache.nifi.documentation.mock.MockConfigurationContext; import org.apache.nifi.documentation.mock.MockControllerServiceInitializationContext; -import org.apache.nifi.documentation.mock.MockProcessorLogger; +import org.apache.nifi.documentation.mock.MockComponentLogger; import org.apache.nifi.documentation.util.ReflectionUtils; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.NarCloseable; @@ -49,7 +49,7 @@ public class ControllerServiceInitializer implements ConfigurableComponentInitia try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { ControllerService controllerService = (ControllerService) component; - final ComponentLog logger = new MockProcessorLogger(); + final ComponentLog logger = new MockComponentLogger(); final MockConfigurationContext context = new MockConfigurationContext(); ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, controllerService, logger, context); } http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java index 7fe4946..7a66f72 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ProcessorInitializer.java @@ -21,7 +21,7 @@ import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.documentation.ConfigurableComponentInitializer; import org.apache.nifi.documentation.mock.MockProcessContext; import org.apache.nifi.documentation.mock.MockProcessorInitializationContext; -import org.apache.nifi.documentation.mock.MockProcessorLogger; +import org.apache.nifi.documentation.mock.MockComponentLogger; import org.apache.nifi.documentation.util.ReflectionUtils; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.nar.NarCloseable; @@ -47,7 +47,7 @@ public class ProcessorInitializer implements ConfigurableComponentInitializer { Processor processor = (Processor) component; try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { - final ComponentLog logger = new MockProcessorLogger(); + final ComponentLog logger = new MockComponentLogger(); final MockProcessContext context = new MockProcessContext(); ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, processor, logger, context); } http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java index 171f1d9..32e878c 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/init/ReportingTaskingInitializer.java @@ -20,7 +20,7 @@ import org.apache.nifi.annotation.lifecycle.OnShutdown; import org.apache.nifi.components.ConfigurableComponent; import org.apache.nifi.documentation.ConfigurableComponentInitializer; import org.apache.nifi.documentation.mock.MockConfigurationContext; -import org.apache.nifi.documentation.mock.MockProcessorLogger; +import org.apache.nifi.documentation.mock.MockComponentLogger; import org.apache.nifi.documentation.mock.MockReportingInitializationContext; import org.apache.nifi.documentation.util.ReflectionUtils; import org.apache.nifi.nar.NarCloseable; @@ -48,7 +48,7 @@ public class ReportingTaskingInitializer implements ConfigurableComponentInitial try (NarCloseable narCloseable = NarCloseable.withNarLoader()) { final MockConfigurationContext context = new MockConfigurationContext(); - ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, reportingTask, new MockProcessorLogger(), context); + ReflectionUtils.quietlyInvokeMethodsWithAnnotations(OnShutdown.class, org.apache.nifi.processor.annotation.OnShutdown.class, reportingTask, new MockComponentLogger(), context); } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockComponentLogger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockComponentLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockComponentLogger.java new file mode 100644 index 0000000..dd2f1b3 --- /dev/null +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockComponentLogger.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.documentation.mock; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.logging.LogLevel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Stubs out the functionality of a ComponentLog so that it can + * be used during initialization of a component. + * + */ +public class MockComponentLogger implements ComponentLog { + + private static final Logger logger = LoggerFactory + .getLogger(MockComponentLogger.class); + + @Override + public void warn(String msg, Throwable t) { + logger.warn(msg, t); + } + + @Override + public void warn(String msg, Object[] os) { + logger.warn(msg, os); + } + + @Override + public void warn(String msg, Object[] os, Throwable t) { + logger.warn(msg, os); + logger.warn("", t); + } + + @Override + public void warn(String msg) { + logger.warn(msg); + } + + @Override + public void trace(String msg, Throwable t) { + logger.trace(msg, t); + } + + @Override + public void trace(String msg, Object[] os) { + logger.trace(msg, os); + } + + @Override + public void trace(String msg) { + logger.trace(msg); + } + + @Override + public void trace(String msg, Object[] os, Throwable t) { + logger.trace(msg, os); + logger.trace("", t); + } + + @Override + public boolean isWarnEnabled() { + return logger.isWarnEnabled(); + } + + @Override + public boolean isTraceEnabled() { + return logger.isTraceEnabled(); + } + + @Override + public boolean isInfoEnabled() { + return logger.isInfoEnabled(); + } + + @Override + public boolean isErrorEnabled() { + return logger.isErrorEnabled(); + } + + @Override + public boolean isDebugEnabled() { + return logger.isDebugEnabled(); + } + + @Override + public void info(String msg, Throwable t) { + logger.info(msg, t); + } + + @Override + public void info(String msg, Object[] os) { + logger.info(msg, os); + } + + @Override + public void info(String msg) { + logger.info(msg); + + } + + @Override + public void info(String msg, Object[] os, Throwable t) { + logger.trace(msg, os); + logger.trace("", t); + + } + + @Override + public String getName() { + return logger.getName(); + } + + @Override + public void error(String msg, Throwable t) { + logger.error(msg, t); + } + + @Override + public void error(String msg, Object[] os) { + logger.error(msg, os); + } + + @Override + public void error(String msg) { + logger.error(msg); + } + + @Override + public void error(String msg, Object[] os, Throwable t) { + logger.error(msg, os); + logger.error("", t); + } + + @Override + public void debug(String msg, Throwable t) { + logger.debug(msg, t); + } + + @Override + public void debug(String msg, Object[] os) { + logger.debug(msg, os); + } + + @Override + public void debug(String msg, Object[] os, Throwable t) { + logger.debug(msg, os); + logger.debug("", t); + } + + @Override + public void debug(String msg) { + logger.debug(msg); + } + + @Override + public void log(LogLevel level, String msg, Throwable t) { + switch (level) { + case DEBUG: + debug(msg, t); + break; + case ERROR: + case FATAL: + error(msg, t); + break; + case INFO: + info(msg, t); + break; + case TRACE: + trace(msg, t); + break; + case WARN: + warn(msg, t); + break; + } + } + + @Override + public void log(LogLevel level, String msg, Object[] os) { + switch (level) { + case DEBUG: + debug(msg, os); + break; + case ERROR: + case FATAL: + error(msg, os); + break; + case INFO: + info(msg, os); + break; + case TRACE: + trace(msg, os); + break; + case WARN: + warn(msg, os); + break; + } + } + + @Override + public void log(LogLevel level, String msg) { + switch (level) { + case DEBUG: + debug(msg); + break; + case ERROR: + case FATAL: + error(msg); + break; + case INFO: + info(msg); + break; + case TRACE: + trace(msg); + break; + case WARN: + warn(msg); + break; + } + } + + @Override + public void log(LogLevel level, String msg, Object[] os, Throwable t) { + switch (level) { + case DEBUG: + debug(msg, os, t); + break; + case ERROR: + case FATAL: + error(msg, os, t); + break; + case INFO: + info(msg, os, t); + break; + case TRACE: + trace(msg, os, t); + break; + case WARN: + warn(msg, os, t); + break; + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java index c1cca26..abbde61 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockControllerServiceInitializationContext.java @@ -41,7 +41,7 @@ public class MockControllerServiceInitializationContext implements ControllerSer @Override public ComponentLog getLogger() { - return new MockProcessorLogger(); + return new MockComponentLogger(); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java index 4218250..e536471 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorInitializationContext.java @@ -35,7 +35,7 @@ public class MockProcessorInitializationContext implements ProcessorInitializati @Override public ComponentLog getLogger() { - return new MockProcessorLogger(); + return new MockComponentLogger(); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorLogger.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorLogger.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorLogger.java deleted file mode 100644 index df9e673..0000000 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockProcessorLogger.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.documentation.mock; - -import org.apache.nifi.logging.ComponentLog; -import org.apache.nifi.logging.LogLevel; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Stubs out the functionality of a ProcessorLog/ComponentLog so that it can - * be used during initialization of a component. - * - */ -public class MockProcessorLogger implements ComponentLog { - - private static final Logger logger = LoggerFactory - .getLogger(MockProcessorLogger.class); - - @Override - public void warn(String msg, Throwable t) { - logger.warn(msg, t); - } - - @Override - public void warn(String msg, Object[] os) { - logger.warn(msg, os); - } - - @Override - public void warn(String msg, Object[] os, Throwable t) { - logger.warn(msg, os); - logger.warn("", t); - } - - @Override - public void warn(String msg) { - logger.warn(msg); - } - - @Override - public void trace(String msg, Throwable t) { - logger.trace(msg, t); - } - - @Override - public void trace(String msg, Object[] os) { - logger.trace(msg, os); - } - - @Override - public void trace(String msg) { - logger.trace(msg); - } - - @Override - public void trace(String msg, Object[] os, Throwable t) { - logger.trace(msg, os); - logger.trace("", t); - } - - @Override - public boolean isWarnEnabled() { - return logger.isWarnEnabled(); - } - - @Override - public boolean isTraceEnabled() { - return logger.isTraceEnabled(); - } - - @Override - public boolean isInfoEnabled() { - return logger.isInfoEnabled(); - } - - @Override - public boolean isErrorEnabled() { - return logger.isErrorEnabled(); - } - - @Override - public boolean isDebugEnabled() { - return logger.isDebugEnabled(); - } - - @Override - public void info(String msg, Throwable t) { - logger.info(msg, t); - } - - @Override - public void info(String msg, Object[] os) { - logger.info(msg, os); - } - - @Override - public void info(String msg) { - logger.info(msg); - - } - - @Override - public void info(String msg, Object[] os, Throwable t) { - logger.trace(msg, os); - logger.trace("", t); - - } - - @Override - public String getName() { - return logger.getName(); - } - - @Override - public void error(String msg, Throwable t) { - logger.error(msg, t); - } - - @Override - public void error(String msg, Object[] os) { - logger.error(msg, os); - } - - @Override - public void error(String msg) { - logger.error(msg); - } - - @Override - public void error(String msg, Object[] os, Throwable t) { - logger.error(msg, os); - logger.error("", t); - } - - @Override - public void debug(String msg, Throwable t) { - logger.debug(msg, t); - } - - @Override - public void debug(String msg, Object[] os) { - logger.debug(msg, os); - } - - @Override - public void debug(String msg, Object[] os, Throwable t) { - logger.debug(msg, os); - logger.debug("", t); - } - - @Override - public void debug(String msg) { - logger.debug(msg); - } - - @Override - public void log(LogLevel level, String msg, Throwable t) { - switch (level) { - case DEBUG: - debug(msg, t); - break; - case ERROR: - case FATAL: - error(msg, t); - break; - case INFO: - info(msg, t); - break; - case TRACE: - trace(msg, t); - break; - case WARN: - warn(msg, t); - break; - } - } - - @Override - public void log(LogLevel level, String msg, Object[] os) { - switch (level) { - case DEBUG: - debug(msg, os); - break; - case ERROR: - case FATAL: - error(msg, os); - break; - case INFO: - info(msg, os); - break; - case TRACE: - trace(msg, os); - break; - case WARN: - warn(msg, os); - break; - } - } - - @Override - public void log(LogLevel level, String msg) { - switch (level) { - case DEBUG: - debug(msg); - break; - case ERROR: - case FATAL: - error(msg); - break; - case INFO: - info(msg); - break; - case TRACE: - trace(msg); - break; - case WARN: - warn(msg); - break; - } - } - - @Override - public void log(LogLevel level, String msg, Object[] os, Throwable t) { - switch (level) { - case DEBUG: - debug(msg, os, t); - break; - case ERROR: - case FATAL: - error(msg, os, t); - break; - case INFO: - info(msg, os, t); - break; - case TRACE: - trace(msg, os, t); - break; - case WARN: - warn(msg, os, t); - break; - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java index abaa766..ebf59d6 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-documentation/src/main/java/org/apache/nifi/documentation/mock/MockReportingInitializationContext.java @@ -62,6 +62,6 @@ public class MockReportingInitializationContext implements ReportingInitializati @Override public ComponentLog getLogger() { - return new MockProcessorLogger(); + return new MockComponentLogger(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 4650c92..6b61b49 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -990,11 +990,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R Thread.currentThread().setContextClassLoader(detectedClassLoaderForType); final Class<? extends Processor> processorClass = rawClass.asSubclass(Processor.class); processor = processorClass.newInstance(); - final ComponentLog processorLogger = new SimpleProcessLogger(identifier, processor); - final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, processorLogger, this); + final ComponentLog componentLogger = new SimpleProcessLogger(identifier, processor); + final ProcessorInitializationContext ctx = new StandardProcessorInitializationContext(identifier, componentLogger, this); processor.initialize(ctx); - LogRepositoryFactory.getRepository(identifier).setLogger(processorLogger); + LogRepositoryFactory.getRepository(identifier).setLogger(componentLogger); return processor; } catch (final Throwable t) { throw new ProcessorInstantiationException(type, t); http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java index e185331..ccd731d 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/processor/StandardProcessorInitializationContext.java @@ -26,9 +26,9 @@ public class StandardProcessorInitializationContext implements ProcessorInitiali private final ComponentLog logger; private final ControllerServiceProvider serviceProvider; - public StandardProcessorInitializationContext(final String identifier, final ComponentLog processorLog, final ControllerServiceProvider serviceProvider) { + public StandardProcessorInitializationContext(final String identifier, final ComponentLog componentLog, final ControllerServiceProvider serviceProvider) { this.identifier = identifier; - this.logger = processorLog; + this.logger = componentLog; this.serviceProvider = serviceProvider; } http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java index 1b65af6..77ded36 100644 --- a/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java +++ b/nifi-nar-bundles/nifi-hive-bundle/nifi-hive-processors/src/main/java/org/apache/nifi/processors/hive/SelectHiveQL.java @@ -16,6 +16,19 @@ */ package org.apache.nifi.processors.hive; +import java.io.IOException; +import java.io.OutputStream; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.TimeUnit; + import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -27,7 +40,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.dbcp.hive.HiveDBCPService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; @@ -38,19 +51,6 @@ import org.apache.nifi.util.LongHolder; import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.hive.HiveJdbcCommon; -import java.io.IOException; -import java.io.OutputStream; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; - @EventDriven @InputRequirement(Requirement.INPUT_ALLOWED) @Tags({"hive", "sql", "select", "jdbc", "query", "database"}) @@ -149,7 +149,7 @@ public class SelectHiveQL extends AbstractHiveQLProcessor { } } - final ProcessorLog logger = getLogger(); + final ComponentLog logger = getLogger(); final HiveDBCPService dbcpService = context.getProperty(HIVE_DBCP_SERVICE).asControllerService(HiveDBCPService.class); final String selectQuery = context.getProperty(HIVEQL_SELECT_QUERY).evaluateAttributeExpressions(fileToProcess).getValue(); final String outputFormat = context.getProperty(HIVEQL_OUTPUT_FORMAT).getValue(); http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java index f42a892..79dfce7 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaPublisher.java @@ -31,7 +31,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.stream.io.util.StreamDemarcator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +48,7 @@ class KafkaPublisher implements Closeable { private volatile long ackWaitTime = 30000; - private volatile ProcessorLog processLog; + private volatile ComponentLog processLog; /** * Creates an instance of this class as well as the instance of the @@ -177,10 +177,10 @@ class KafkaPublisher implements Closeable { } /** - * Will set {@link ProcessorLog} as an additional logger to forward log + * Will set {@link ComponentLog} as an additional logger to forward log * messages to NiFi bulletin */ - void setProcessLog(ProcessorLog processLog) { + void setProcessLog(ComponentLog processLog) { this.processLog = processLog; } http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java deleted file mode 100644 index 6346ffd..0000000 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycelTest.java +++ /dev/null @@ -1,456 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.processors.kafka.pubsub; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.when; - -import java.io.Closeable; -import java.lang.reflect.Field; -import java.util.List; -import java.util.Random; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.ProcessSessionFactory; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.TestRunner; -import org.apache.nifi.util.TestRunners; -import org.junit.Ignore; -import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -public class AbstractKafkaProcessorLifecycelTest { - - private final static Random random = new Random(); - - @Test - public void validateBaseProperties() throws Exception { - TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class); - runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, ""); - runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo"); - runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); - - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - - runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo"); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid")); - } - runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234"); - - runner.removeProperty(ConsumeKafka.TOPIC); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("'topic' is invalid because topic is required")); - } - - runner.setProperty(ConsumeKafka.TOPIC, ""); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - - runner.setProperty(ConsumeKafka.TOPIC, " "); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - runner.setProperty(ConsumeKafka.TOPIC, "blah"); - - runner.removeProperty(ConsumeKafka.CLIENT_ID); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("invalid because client.id is required")); - } - - runner.setProperty(ConsumeKafka.CLIENT_ID, ""); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - - runner.setProperty(ConsumeKafka.CLIENT_ID, " "); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj"); - - runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, ""); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, " "); - try { - runner.assertValid(); - fail(); - } catch (AssertionError e) { - assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); - } - } - - @Test - @Ignore // just for extra sanity check - public void validateConcurrencyWithRandomFailuresMultiple() throws Exception { - for (int i = 0; i < 100; i++) { - validateConcurrencyWithRandomFailures(); - } - } - - @Test - public void validateConcurrencyWithRandomFailures() throws Exception { - ExecutorService processingExecutor = Executors.newFixedThreadPool(32); - final AtomicInteger commitCounter = new AtomicInteger(); - final AtomicInteger rollbackCounter = new AtomicInteger(); - final AtomicInteger yieldCounter = new AtomicInteger(); - - final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); - final ProcessSession session = mock(ProcessSession.class); - when(sessionFactory.createSession()).thenReturn(session); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - commitCounter.incrementAndGet(); - return null; - } - }).when(session).commit(); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - rollbackCounter.incrementAndGet(); - return null; - } - }).when(session).rollback(true); - - final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); - - int testCount = 1000; - final CountDownLatch latch = new CountDownLatch(testCount); - for (int i = 0; i < testCount; i++) { - processingExecutor.execute(new Runnable() { - @Override - public void run() { - ProcessContext context = mock(ProcessContext.class); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - yieldCounter.incrementAndGet(); - return null; - } - }).when(context).yield(); - if (random.nextInt(10) == 5) { - when(context.getName()).thenReturn("fail"); - } - try { - processor.onTrigger(context, sessionFactory); - } catch (Exception e) { - fail(); - } finally { - latch.countDown(); - } - } - }); - } - - assertTrue(latch.await(20000, TimeUnit.MILLISECONDS)); - processingExecutor.shutdown(); - - System.out.println("SUCCESS: " + processor.successfulTriggers); - System.out.println("FAILURE: " + processor.failedTriggers); - System.out.println("INIT: " + processor.resourceReinitialized); - System.out.println("YIELD CALLS: " + yieldCounter.get()); - System.out.println("COMMIT CALLS: " + commitCounter.get()); - System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); - System.out.println("Close CALLS: " + processor.closeCounter.get()); - - /* - * this has to be <= 1 since the last thread may come to finally block - * after acceptTask flag has been reset at which point the close will - * not be called (which is correct behavior since it will be invoked - * explicitly by the life-cycle operations of a running processor). - * - * You can actually observe the =1 behavior in the next test where it is - * always 0 close calls - */ - int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); - assertTrue(closeVsInitDiff <= 1); - - assertEquals(commitCounter.get(), processor.successfulTriggers.get()); - assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); - - assertEquals(testCount, - processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); - } - - @Test - public void validateConcurrencyWithAllSuccesses() throws Exception { - ExecutorService processingExecutor = Executors.newFixedThreadPool(32); - final AtomicInteger commitCounter = new AtomicInteger(); - final AtomicInteger rollbackCounter = new AtomicInteger(); - final AtomicInteger yieldCounter = new AtomicInteger(); - - final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); - final ProcessSession session = mock(ProcessSession.class); - when(sessionFactory.createSession()).thenReturn(session); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - commitCounter.incrementAndGet(); - return null; - } - }).when(session).commit(); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - rollbackCounter.incrementAndGet(); - return null; - } - }).when(session).rollback(true); - - final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); - - int testCount = 1000; - final CountDownLatch latch = new CountDownLatch(testCount); - for (int i = 0; i < testCount; i++) { - processingExecutor.execute(new Runnable() { - @Override - public void run() { - ProcessContext context = mock(ProcessContext.class); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - yieldCounter.incrementAndGet(); - return null; - } - }).when(context).yield(); - try { - processor.onTrigger(context, sessionFactory); - } catch (Exception e) { - fail(); - } finally { - latch.countDown(); - } - } - }); - } - - assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)); - processingExecutor.shutdown(); - - System.out.println("SUCCESS: " + processor.successfulTriggers); - System.out.println("FAILURE: " + processor.failedTriggers); - System.out.println("INIT: " + processor.resourceReinitialized); - System.out.println("YIELD CALLS: " + yieldCounter.get()); - System.out.println("COMMIT CALLS: " + commitCounter.get()); - System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); - System.out.println("Close CALLS: " + processor.closeCounter.get()); - - /* - * unlike previous test this one will always be 1 since there are no - * failures - */ - int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); - assertEquals(1, closeVsInitDiff); - - assertEquals(commitCounter.get(), processor.successfulTriggers.get()); - assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); - - assertEquals(testCount, - processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); - } - - @Test - public void validateConcurrencyWithAllFailures() throws Exception { - ExecutorService processingExecutor = Executors.newFixedThreadPool(32); - final AtomicInteger commitCounter = new AtomicInteger(); - final AtomicInteger rollbackCounter = new AtomicInteger(); - final AtomicInteger yieldCounter = new AtomicInteger(); - - final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); - final ProcessSession session = mock(ProcessSession.class); - when(sessionFactory.createSession()).thenReturn(session); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - commitCounter.incrementAndGet(); - return null; - } - }).when(session).commit(); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - rollbackCounter.incrementAndGet(); - return null; - } - }).when(session).rollback(true); - - final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); - - int testCount = 1000; - final CountDownLatch latch = new CountDownLatch(testCount); - for (int i = 0; i < testCount; i++) { - processingExecutor.execute(new Runnable() { - @Override - public void run() { - ProcessContext context = mock(ProcessContext.class); - doAnswer(new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - yieldCounter.incrementAndGet(); - return null; - } - }).when(context).yield(); - when(context.getName()).thenReturn("fail"); - try { - processor.onTrigger(context, sessionFactory); - } catch (Exception e) { - fail(); - } finally { - latch.countDown(); - } - } - }); - } - - assertTrue(latch.await(20000, TimeUnit.MILLISECONDS)); - processingExecutor.shutdown(); - - System.out.println("SUCCESS: " + processor.successfulTriggers); - System.out.println("FAILURE: " + processor.failedTriggers); - System.out.println("INIT: " + processor.resourceReinitialized); - System.out.println("YIELD CALLS: " + yieldCounter.get()); - System.out.println("COMMIT CALLS: " + commitCounter.get()); - System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); - System.out.println("Close CALLS: " + processor.closeCounter.get()); - - /* - * unlike previous test this one will always be 0 since all triggers are - * failures - */ - int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); - assertEquals(0, closeVsInitDiff); - - assertEquals(commitCounter.get(), processor.successfulTriggers.get()); - assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); - - assertEquals(testCount, - processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); - } - - /** - * - */ - public static class DummyProcessor extends AbstractKafkaProcessor<Closeable> { - @Override - protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException { - return true; - } - - @Override - protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { - return mock(Closeable.class); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return SHARED_DESCRIPTORS; - } - } - - - public static class ConcurrencyValidatingProcessor extends AbstractKafkaProcessor<Closeable> { - final AtomicInteger failedTriggers = new AtomicInteger(); - final AtomicInteger successfulTriggers = new AtomicInteger(); - final AtomicInteger resourceReinitialized = new AtomicInteger(); - final AtomicInteger closeCounter = new AtomicInteger(); - - ConcurrencyValidatingProcessor() { - try { - Field loggerField = AbstractSessionFactoryProcessor.class.getDeclaredField("logger"); - loggerField.setAccessible(true); - loggerField.set(this, mock(ProcessorLog.class)); - } catch (Exception e) { - throw new IllegalStateException(e); - } - } - - @Override - @OnStopped - public void close() { - super.close(); - assertTrue(this.kafkaResource == null); - closeCounter.incrementAndGet(); - } - - @Override - protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) { - assertNotNull(this.kafkaResource); - if ("fail".equals(context.getName())) { - failedTriggers.incrementAndGet(); - throw new RuntimeException("Intentional"); - } - this.successfulTriggers.incrementAndGet(); - return true; - } - - @Override - protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { - this.resourceReinitialized.incrementAndGet(); - return mock(Closeable.class); - } - } -} http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java new file mode 100644 index 0000000..d09be60 --- /dev/null +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-pubsub-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/AbstractKafkaProcessorLifecycleTest.java @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.kafka.pubsub; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.io.Closeable; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.ProcessSessionFactory; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Ignore; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class AbstractKafkaProcessorLifecycleTest { + + private final static Random random = new Random(); + + @Test + public void validateBaseProperties() throws Exception { + TestRunner runner = TestRunners.newTestRunner(DummyProcessor.class); + runner.setProperty(AbstractKafkaProcessor.BOOTSTRAP_SERVERS, ""); + runner.setProperty(AbstractKafkaProcessor.TOPIC, "foo"); + runner.setProperty(ConsumeKafka.CLIENT_ID, "foo"); + + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo"); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("'bootstrap.servers' validated against 'foo' is invalid")); + } + runner.setProperty(ConsumeKafka.BOOTSTRAP_SERVERS, "foo:1234"); + + runner.removeProperty(ConsumeKafka.TOPIC); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("'topic' is invalid because topic is required")); + } + + runner.setProperty(ConsumeKafka.TOPIC, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.TOPIC, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + runner.setProperty(ConsumeKafka.TOPIC, "blah"); + + runner.removeProperty(ConsumeKafka.CLIENT_ID); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("invalid because client.id is required")); + } + + runner.setProperty(ConsumeKafka.CLIENT_ID, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + + runner.setProperty(ConsumeKafka.CLIENT_ID, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + runner.setProperty(ConsumeKafka.CLIENT_ID, "ghj"); + + runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, ""); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + runner.setProperty(PublishKafka.KERBEROS_PRINCIPLE, " "); + try { + runner.assertValid(); + fail(); + } catch (AssertionError e) { + assertTrue(e.getMessage().contains("must contain at least one character that is not white space")); + } + } + + @Test + @Ignore // just for extra sanity check + public void validateConcurrencyWithRandomFailuresMultiple() throws Exception { + for (int i = 0; i < 100; i++) { + validateConcurrencyWithRandomFailures(); + } + } + + @Test + public void validateConcurrencyWithRandomFailures() throws Exception { + ExecutorService processingExecutor = Executors.newFixedThreadPool(32); + final AtomicInteger commitCounter = new AtomicInteger(); + final AtomicInteger rollbackCounter = new AtomicInteger(); + final AtomicInteger yieldCounter = new AtomicInteger(); + + final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); + final ProcessSession session = mock(ProcessSession.class); + when(sessionFactory.createSession()).thenReturn(session); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + commitCounter.incrementAndGet(); + return null; + } + }).when(session).commit(); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + rollbackCounter.incrementAndGet(); + return null; + } + }).when(session).rollback(true); + + final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); + + int testCount = 1000; + final CountDownLatch latch = new CountDownLatch(testCount); + for (int i = 0; i < testCount; i++) { + processingExecutor.execute(new Runnable() { + @Override + public void run() { + ProcessContext context = mock(ProcessContext.class); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + yieldCounter.incrementAndGet(); + return null; + } + }).when(context).yield(); + if (random.nextInt(10) == 5) { + when(context.getName()).thenReturn("fail"); + } + try { + processor.onTrigger(context, sessionFactory); + } catch (Exception e) { + fail(); + } finally { + latch.countDown(); + } + } + }); + } + + assertTrue(latch.await(20000, TimeUnit.MILLISECONDS)); + processingExecutor.shutdown(); + + System.out.println("SUCCESS: " + processor.successfulTriggers); + System.out.println("FAILURE: " + processor.failedTriggers); + System.out.println("INIT: " + processor.resourceReinitialized); + System.out.println("YIELD CALLS: " + yieldCounter.get()); + System.out.println("COMMIT CALLS: " + commitCounter.get()); + System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); + System.out.println("Close CALLS: " + processor.closeCounter.get()); + + /* + * this has to be <= 1 since the last thread may come to finally block + * after acceptTask flag has been reset at which point the close will + * not be called (which is correct behavior since it will be invoked + * explicitly by the life-cycle operations of a running processor). + * + * You can actually observe the =1 behavior in the next test where it is + * always 0 close calls + */ + int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); + assertTrue(closeVsInitDiff <= 1); + + assertEquals(commitCounter.get(), processor.successfulTriggers.get()); + assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); + + assertEquals(testCount, + processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); + } + + @Test + public void validateConcurrencyWithAllSuccesses() throws Exception { + ExecutorService processingExecutor = Executors.newFixedThreadPool(32); + final AtomicInteger commitCounter = new AtomicInteger(); + final AtomicInteger rollbackCounter = new AtomicInteger(); + final AtomicInteger yieldCounter = new AtomicInteger(); + + final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); + final ProcessSession session = mock(ProcessSession.class); + when(sessionFactory.createSession()).thenReturn(session); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + commitCounter.incrementAndGet(); + return null; + } + }).when(session).commit(); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + rollbackCounter.incrementAndGet(); + return null; + } + }).when(session).rollback(true); + + final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); + + int testCount = 1000; + final CountDownLatch latch = new CountDownLatch(testCount); + for (int i = 0; i < testCount; i++) { + processingExecutor.execute(new Runnable() { + @Override + public void run() { + ProcessContext context = mock(ProcessContext.class); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + yieldCounter.incrementAndGet(); + return null; + } + }).when(context).yield(); + try { + processor.onTrigger(context, sessionFactory); + } catch (Exception e) { + fail(); + } finally { + latch.countDown(); + } + } + }); + } + + assertTrue(latch.await(30000, TimeUnit.MILLISECONDS)); + processingExecutor.shutdown(); + + System.out.println("SUCCESS: " + processor.successfulTriggers); + System.out.println("FAILURE: " + processor.failedTriggers); + System.out.println("INIT: " + processor.resourceReinitialized); + System.out.println("YIELD CALLS: " + yieldCounter.get()); + System.out.println("COMMIT CALLS: " + commitCounter.get()); + System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); + System.out.println("Close CALLS: " + processor.closeCounter.get()); + + /* + * unlike previous test this one will always be 1 since there are no + * failures + */ + int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); + assertEquals(1, closeVsInitDiff); + + assertEquals(commitCounter.get(), processor.successfulTriggers.get()); + assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); + + assertEquals(testCount, + processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); + } + + @Test + public void validateConcurrencyWithAllFailures() throws Exception { + ExecutorService processingExecutor = Executors.newFixedThreadPool(32); + final AtomicInteger commitCounter = new AtomicInteger(); + final AtomicInteger rollbackCounter = new AtomicInteger(); + final AtomicInteger yieldCounter = new AtomicInteger(); + + final ProcessSessionFactory sessionFactory = mock(ProcessSessionFactory.class); + final ProcessSession session = mock(ProcessSession.class); + when(sessionFactory.createSession()).thenReturn(session); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + commitCounter.incrementAndGet(); + return null; + } + }).when(session).commit(); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + rollbackCounter.incrementAndGet(); + return null; + } + }).when(session).rollback(true); + + final ConcurrencyValidatingProcessor processor = spy(new ConcurrencyValidatingProcessor()); + + int testCount = 1000; + final CountDownLatch latch = new CountDownLatch(testCount); + for (int i = 0; i < testCount; i++) { + processingExecutor.execute(new Runnable() { + @Override + public void run() { + ProcessContext context = mock(ProcessContext.class); + doAnswer(new Answer<Void>() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + yieldCounter.incrementAndGet(); + return null; + } + }).when(context).yield(); + when(context.getName()).thenReturn("fail"); + try { + processor.onTrigger(context, sessionFactory); + } catch (Exception e) { + fail(); + } finally { + latch.countDown(); + } + } + }); + } + + assertTrue(latch.await(20000, TimeUnit.MILLISECONDS)); + processingExecutor.shutdown(); + + System.out.println("SUCCESS: " + processor.successfulTriggers); + System.out.println("FAILURE: " + processor.failedTriggers); + System.out.println("INIT: " + processor.resourceReinitialized); + System.out.println("YIELD CALLS: " + yieldCounter.get()); + System.out.println("COMMIT CALLS: " + commitCounter.get()); + System.out.println("ROLLBACK CALLS: " + rollbackCounter.get()); + System.out.println("Close CALLS: " + processor.closeCounter.get()); + + /* + * unlike previous test this one will always be 0 since all triggers are + * failures + */ + int closeVsInitDiff = processor.resourceReinitialized.get() - processor.closeCounter.get(); + assertEquals(0, closeVsInitDiff); + + assertEquals(commitCounter.get(), processor.successfulTriggers.get()); + assertEquals(rollbackCounter.get(), processor.failedTriggers.get()); + + assertEquals(testCount, + processor.successfulTriggers.get() + processor.failedTriggers.get() + yieldCounter.get()); + } + + /** + * + */ + public static class DummyProcessor extends AbstractKafkaProcessor<Closeable> { + @Override + protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) throws ProcessException { + return true; + } + + @Override + protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { + return mock(Closeable.class); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return SHARED_DESCRIPTORS; + } + } + + + public static class ConcurrencyValidatingProcessor extends AbstractKafkaProcessor<Closeable> { + final AtomicInteger failedTriggers = new AtomicInteger(); + final AtomicInteger successfulTriggers = new AtomicInteger(); + final AtomicInteger resourceReinitialized = new AtomicInteger(); + final AtomicInteger closeCounter = new AtomicInteger(); + + ConcurrencyValidatingProcessor() { + try { + Field loggerField = AbstractSessionFactoryProcessor.class.getDeclaredField("logger"); + loggerField.setAccessible(true); + loggerField.set(this, mock(ComponentLog.class)); + } catch (Exception e) { + throw new IllegalStateException(e); + } + } + + @Override + @OnStopped + public void close() { + super.close(); + assertTrue(this.kafkaResource == null); + closeCounter.incrementAndGet(); + } + + @Override + protected boolean rendezvousWithKafka(ProcessContext context, ProcessSession session) { + assertNotNull(this.kafkaResource); + if ("fail".equals(context.getName())) { + failedTriggers.incrementAndGet(); + throw new RuntimeException("Intentional"); + } + this.successfulTriggers.incrementAndGet(); + return true; + } + + @Override + protected Closeable buildKafkaResource(ProcessContext context, ProcessSession session) throws ProcessException { + this.resourceReinitialized.incrementAndGet(); + return mock(Closeable.class); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index 224c55f..b863a42 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -30,8 +30,6 @@ import javax.script.ScriptException; import javax.script.SimpleBindings; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.lifecycle.OnStopped; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.annotation.lifecycle.OnScheduled; http://git-wip-us.apache.org/repos/asf/nifi/blob/1bd2cf0d/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java index c8576aa..675b135 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TransformJSON.java @@ -42,7 +42,7 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -57,13 +57,13 @@ import org.apache.nifi.util.StopWatch; import org.apache.nifi.util.StringUtils; import com.bazaarvoice.jolt.CardinalityTransform; -import com.bazaarvoice.jolt.Shiftr; -import com.bazaarvoice.jolt.Removr; import com.bazaarvoice.jolt.Chainr; import com.bazaarvoice.jolt.Defaultr; +import com.bazaarvoice.jolt.JsonUtils; +import com.bazaarvoice.jolt.Removr; +import com.bazaarvoice.jolt.Shiftr; import com.bazaarvoice.jolt.Sortr; import com.bazaarvoice.jolt.Transform; -import com.bazaarvoice.jolt.JsonUtils; @EventDriven @SideEffectFree @@ -175,7 +175,7 @@ public class TransformJSON extends AbstractProcessor { return; } - final ProcessorLog logger = getLogger(); + final ComponentLog logger = getLogger(); final StopWatch stopWatch = new StopWatch(true); final byte[] originalContent = new byte[(int) original.getSize()];
