Repository: nifi
Updated Branches:
  refs/heads/master 1811ba568 -> bf15502e1


NIFI-3763 Add new processor to log user defined messages built with NiFi 
Expression Language

Signed-off-by: Matt Burgess <[email protected]>

This closes #1737


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bf15502e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bf15502e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bf15502e

Branch: refs/heads/master
Commit: bf15502e1994c68d706a2a2749495b57fd9a11e4
Parents: 1811ba5
Author: Peter G. Horvath <[email protected]>
Authored: Mon May 15 18:10:26 2017 +0200
Committer: Matt Burgess <[email protected]>
Committed: Mon May 15 14:18:57 2017 -0400

----------------------------------------------------------------------
 .../src/main/resources/conf/logback.xml         |   1 +
 .../nifi/processors/standard/LogMessage.java    | 211 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/standard/TestLogMessage.java     | 130 ++++++++++++
 4 files changed, 343 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bf15502e/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
index f2da200..da1adf0 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-resources/src/main/resources/conf/logback.xml
@@ -86,6 +86,7 @@
     <logger name="org.apache.nifi" level="INFO"/>
     <logger name="org.apache.nifi.processors" level="WARN"/>
     <logger name="org.apache.nifi.processors.standard.LogAttribute" 
level="INFO"/>
+    <logger name="org.apache.nifi.processors.standard.LogMessage" 
level="INFO"/>
     <logger 
name="org.apache.nifi.controller.repository.StandardProcessSession" 
level="WARN" />
     
     

http://git-wip-us.apache.org/repos/asf/nifi/blob/bf15502e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogMessage.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogMessage.java
new file mode 100644
index 0000000..2d6b318
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/LogMessage.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.jetty.util.StringUtil;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"attributes", "logging"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Emits a log message at the specified log level")
+public class LogMessage extends AbstractProcessor {
+
+    public static final PropertyDescriptor LOG_LEVEL = new 
PropertyDescriptor.Builder()
+            .name("log-level")
+            .displayName("Log Level")
+            .required(true)
+            .description("The Log Level to use when logging the message")
+            .allowableValues(MessageLogLevel.values())
+            .defaultValue(MessageLogLevel.info.toString())
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor LOG_PREFIX = new 
PropertyDescriptor.Builder()
+            .name("log-prefix")
+            .displayName("Log prefix")
+            .required(false)
+            .description("Log prefix appended to the log lines. " +
+                    "It helps to distinguish the output of multiple LogMessage 
processors.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor LOG_MESSAGE = new 
PropertyDescriptor.Builder()
+            .name("log-message")
+            .displayName("Log message")
+            .required(false)
+            .description("The log message to emit")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("All FlowFiles are routed to this relationship")
+            .build();
+
+    private static final int CHUNK_SIZE = 50;
+
+    enum MessageLogLevel {
+
+        trace, debug, info, warn, error
+    }
+
+    private Set<Relationship> relationships;
+    private List<PropertyDescriptor> supportedDescriptors;
+
+    @Override
+    protected void init(final ProcessorInitializationContext context) {
+        final Set<Relationship> procRels = new HashSet<>();
+        procRels.add(REL_SUCCESS);
+        relationships = Collections.unmodifiableSet(procRels);
+
+        // descriptors
+        final List<PropertyDescriptor> supDescriptors = new ArrayList<>();
+        supDescriptors.add(LOG_LEVEL);
+        supDescriptors.add(LOG_PREFIX);
+        supDescriptors.add(LOG_MESSAGE);
+        supportedDescriptors = Collections.unmodifiableList(supDescriptors);
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return supportedDescriptors;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) {
+
+        final String logLevelValue = 
context.getProperty(LOG_LEVEL).getValue().toLowerCase();
+
+        final MessageLogLevel logLevel;
+        try {
+            logLevel = MessageLogLevel.valueOf(logLevelValue);
+        } catch (Exception e) {
+            throw new ProcessException(e);
+        }
+
+        final ComponentLog logger = getLogger();
+        boolean isLogLevelEnabled = false;
+        switch (logLevel) {
+            case trace:
+                isLogLevelEnabled = logger.isTraceEnabled();
+                break;
+            case debug:
+                isLogLevelEnabled = logger.isDebugEnabled();
+                break;
+            case info:
+                isLogLevelEnabled = logger.isInfoEnabled();
+                break;
+            case warn:
+                isLogLevelEnabled = logger.isWarnEnabled();
+                break;
+            case error:
+                isLogLevelEnabled = logger.isErrorEnabled();
+                break;
+        }
+
+        if (!isLogLevelEnabled) {
+            transferChunk(session);
+            return;
+        }
+
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        processFlowFile(logger, logLevel, flowFile, context);
+        session.transfer(flowFile, REL_SUCCESS);
+    }
+
+    private void processFlowFile(
+            final ComponentLog logger,
+            final MessageLogLevel logLevel,
+            final FlowFile flowFile,
+            final ProcessContext context) {
+
+        String logPrefix = 
context.getProperty(LOG_PREFIX).evaluateAttributeExpressions(flowFile).getValue();
+        String logMessage = 
context.getProperty(LOG_MESSAGE).evaluateAttributeExpressions(flowFile).getValue();
+
+        String messageToWrite;
+        if (StringUtil.isBlank(logPrefix)) {
+            messageToWrite = logMessage;
+        } else {
+            messageToWrite = String.format("%s%s", logPrefix, logMessage);
+        }
+
+        // Uses optional property to specify logging level
+        switch (logLevel) {
+            case info:
+                logger.info(messageToWrite);
+                break;
+            case debug:
+                logger.debug(messageToWrite);
+                break;
+            case warn:
+                logger.warn(messageToWrite);
+                break;
+            case trace:
+                logger.trace(messageToWrite);
+                break;
+            case error:
+                logger.error(messageToWrite);
+                break;
+            default:
+                logger.debug(messageToWrite);
+        }
+    }
+
+    private void transferChunk(final ProcessSession session) {
+        final List<FlowFile> flowFiles = session.get(CHUNK_SIZE);
+        if (!flowFiles.isEmpty()) {
+            session.transfer(flowFiles, REL_SUCCESS);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/bf15502e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 221fe0a..f345524 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -58,6 +58,7 @@ org.apache.nifi.processors.standard.ListenTCP
 org.apache.nifi.processors.standard.ListenUDP
 org.apache.nifi.processors.standard.ListSFTP
 org.apache.nifi.processors.standard.LogAttribute
+org.apache.nifi.processors.standard.LogMessage
 org.apache.nifi.processors.standard.MergeContent
 org.apache.nifi.processors.standard.ModifyBytes
 org.apache.nifi.processors.standard.MonitorActivity

http://git-wip-us.apache.org/repos/asf/nifi/blob/bf15502e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLogMessage.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLogMessage.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLogMessage.java
new file mode 100644
index 0000000..98a8952
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestLogMessage.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.MockComponentLog;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+
+public class TestLogMessage {
+
+    private TestableLogMessage testableLogMessage;
+    private TestRunner runner;
+
+    private static class TestableLogMessage extends LogMessage {
+
+        MockComponentLog getMockComponentLog() {
+            ComponentLog mockLog = getLogger();
+
+            if (!(mockLog instanceof MockComponentLog)) {
+                throw new IllegalStateException("Logger is expected to be 
MockComponentLog, but was: " +
+                        mockLog.getClass());
+            }
+
+            return (MockComponentLog) mockLog;
+        }
+
+
+    }
+
+    @Before
+    public void before() throws InitializationException {
+        testableLogMessage = new TestableLogMessage();
+        runner = TestRunners.newTestRunner(testableLogMessage);
+
+    }
+
+    @After
+    public void after() throws InitializationException {
+        runner.shutdown();
+    }
+
+    @Test
+    public void testInfoMessageLogged() throws InitializationException, 
IOException {
+
+        runner.setProperty(LogMessage.LOG_MESSAGE, "This should help the 
operator to follow the flow: ${foobar}");
+        runner.setProperty(LogMessage.LOG_LEVEL, 
LogMessage.MessageLogLevel.info.toString());
+
+        HashMap<String, String> flowAttributes = new HashMap<>();
+        flowAttributes.put("foobar", "baz");
+
+        runner.enqueue("This is a message!", flowAttributes);
+        runner.setValidateExpressionUsage(false);
+
+        runner.run();
+
+        List<MockFlowFile> successFlowFiles = 
runner.getFlowFilesForRelationship(LogMessage.REL_SUCCESS);
+        Assert.assertEquals(1, successFlowFiles.size());
+
+        MockComponentLog mockComponentLog = 
testableLogMessage.getMockComponentLog();
+        List<org.apache.nifi.util.LogMessage> infoMessages = 
mockComponentLog.getInfoMessages();
+        Assert.assertEquals(1, infoMessages.size());
+        Assert.assertTrue(infoMessages.get(0).getMsg()
+                .endsWith("This should help the operator to follow the flow: 
baz"));
+
+
+        Assert.assertTrue(mockComponentLog.getTraceMessages().isEmpty());
+        Assert.assertTrue(mockComponentLog.getDebugMessages().isEmpty());
+        Assert.assertTrue(mockComponentLog.getWarnMessages().isEmpty());
+        Assert.assertTrue(mockComponentLog.getErrorMessages().isEmpty());
+    }
+
+    @Test
+    public void testInfoMessageWithPrefixLogged() throws 
InitializationException, IOException {
+
+        runner.setProperty(LogMessage.LOG_PREFIX, "FOOBAR>>>");
+        runner.setProperty(LogMessage.LOG_MESSAGE, "This should help the 
operator to follow the flow: ${foobar}");
+        runner.setProperty(LogMessage.LOG_LEVEL, 
LogMessage.MessageLogLevel.info.toString());
+
+        HashMap<String, String> flowAttributes = new HashMap<>();
+        flowAttributes.put("foobar", "baz");
+
+        runner.enqueue("This is a message!", flowAttributes);
+        runner.setValidateExpressionUsage(false);
+
+        runner.run();
+
+        List<MockFlowFile> successFlowFiles = 
runner.getFlowFilesForRelationship(LogMessage.REL_SUCCESS);
+        Assert.assertEquals(1, successFlowFiles.size());
+
+        MockComponentLog mockComponentLog = 
testableLogMessage.getMockComponentLog();
+        List<org.apache.nifi.util.LogMessage> infoMessages = 
mockComponentLog.getInfoMessages();
+        Assert.assertEquals(1, infoMessages.size());
+        Assert.assertTrue(infoMessages.get(0).getMsg()
+                .endsWith("FOOBAR>>>This should help the operator to follow 
the flow: baz"));
+
+
+
+        Assert.assertTrue(mockComponentLog.getTraceMessages().isEmpty());
+        Assert.assertTrue(mockComponentLog.getDebugMessages().isEmpty());
+        Assert.assertTrue(mockComponentLog.getWarnMessages().isEmpty());
+        Assert.assertTrue(mockComponentLog.getErrorMessages().isEmpty());
+    }
+
+}

Reply via email to