GitHub user WilliamNouet opened a pull request:

    https://github.com/apache/nifi/pull/2028

    NIFI-3518 Create a Morphlines processor

    Thank you for submitting a contribution to Apache NiFi.
    
    In order to streamline the review of the contribution we ask you
    to ensure the following steps have been taken:
    
    For all changes:
    
    [Y] Is there a JIRA ticket associated with this PR? Is it referenced
    in the commit message?
    
    [Y] Does your PR title start with NIFI-XXXX where XXXX is the JIRA number 
you are trying to resolve? Pay particular attention to the hyphen "-" character.
    
    [Y] Has your PR been rebased against the latest commit within the target 
branch (typically master)?
    
    [Y] Is your initial contribution a single, squashed commit?
    
    For code changes:
    
    [Y] Have you ensured that the full suite of tests is executed via mvn 
-Pcontrib-check clean install at the root nifi folder?
    [Y] Have you written or updated unit tests to verify your changes?
    [N/A] If adding new dependencies to the code, are these dependencies 
licensed in a way that is compatible for inclusion under ASF 2.0?
    [N/A] If applicable, have you updated the LICENSE file, including the main 
LICENSE file under nifi-assembly?
    [N/A] If applicable, have you updated the NOTICE file, including the main 
NOTICE file found under nifi-assembly?
    [N/A] If adding new Properties, have you added .displayName in addition to 
.name (programmatic access) for each of the new properties?
    For documentation related changes:
    
    [N/A] Have you ensured that format looks appropriate for the output in 
which it is rendered?
    Note:
    
    Please ensure that once the PR is submitted, you check travis-ci for build 
issues and submit an update to your PR as soon as possible.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/WilliamNouet/nifi NIFI-3518

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/nifi/pull/2028.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2028
    
----
commit bddb035a58fdf8542ceb9808e5ca3c0eb61baf9c
Author: WilliamNouet <[email protected]>
Date:   2017-07-21T16:34:31Z

    NIFI-3518 Create a Morphlines processor
    
    diff --git 
a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml 
b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml
    new file mode 100644
    index 0000000..afb93b8
    --- /dev/null
    +++ b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-nar/pom.xml
    @@ -0,0 +1,41 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-morphlines-bundle</artifactId>
    +        <version>1.4.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-morphlines-nar</artifactId>
    +    <version>1.4.0-SNAPSHOT</version>
    +    <packaging>nar</packaging>
    +    <properties>
    +        <maven.javadoc.skip>true</maven.javadoc.skip>
    +        <source.skip>true</source.skip>
    +    </properties>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-morphlines-processors</artifactId>
    +            <version>1.4.0-SNAPSHOT</version>
    +        </dependency>
    +    </dependencies>
    +
    +</project>
    diff --git 
a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml 
b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml
    new file mode 100644
    index 0000000..c9ebb31
    --- /dev/null
    +++ 
b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/pom.xml
    @@ -0,0 +1,61 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-morphlines-bundle</artifactId>
    +        <version>1.4.0-SNAPSHOT</version>
    +    </parent>
    +
    +    <artifactId>nifi-morphlines-processors</artifactId>
    +    <packaging>jar</packaging>
    +
    +    <dependencies>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-api</artifactId>
    +            <version>${nifi.version}</version>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-processor-utils</artifactId>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.apache.nifi</groupId>
    +            <artifactId>nifi-mock</artifactId>
    +            <version>${nifi.version}</version>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.slf4j</groupId>
    +            <artifactId>slf4j-simple</artifactId>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>junit</groupId>
    +            <artifactId>junit</artifactId>
    +            <version>4.11</version>
    +            <scope>test</scope>
    +        </dependency>
    +        <dependency>
    +            <groupId>org.kitesdk</groupId>
    +            <artifactId>kite-morphlines-core</artifactId>
    +            <version>${kite.version}</version>
    +        </dependency>
    +    </dependencies>
    +</project>
    diff --git 
a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java
 
b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java
    new file mode 100644
    index 0000000..56bf66f
    --- /dev/null
    +++ 
b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ImplementMorphlines.java
    @@ -0,0 +1,253 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.morphlines;
    +
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.components.PropertyValue;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.annotation.behavior.DynamicProperty;
    +import org.apache.nifi.annotation.lifecycle.OnScheduled;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.InputStreamCallback;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.io.StreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.apache.nifi.stream.io.StreamUtils;
    +import org.kitesdk.morphline.api.Command;
    +import org.kitesdk.morphline.api.MorphlineContext;
    +import org.kitesdk.morphline.api.Record;
    +import org.kitesdk.morphline.base.Fields;
    +import org.kitesdk.morphline.base.Compiler;
    +import org.kitesdk.morphline.base.Notifications;
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.ImmutableSet;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.List;
    +import java.util.Set;
    +import java.util.Iterator;
    +import java.util.ArrayList;
    +import java.util.Map;
    +import java.util.HashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
    +@Tags({"kitesdk", "morphlines", "ETL", "HDFS", "avro", "Solr", "HBase"})
    +@CapabilityDescription("Implements Morphlines 
(http://kitesdk.org/docs/1.1.0/morphlines/) framework, which performs in-memory 
container of transformation "
    +    + "commands in oder to perform tasks such as loading, parsing, 
transforming, or otherwise processing a single record.")
    +@DynamicProperty(name = "Relationship Name", value = "A Regular 
Expression", supportsExpressionLanguage = true, description = "Adds the dynamic 
property key and value "
    +    + "as key-value pair to Morphlines content.")
    +
    +public class ImplementMorphlines extends AbstractProcessor {
    +    public static final PropertyDescriptor MORPHLINES_ID = new 
PropertyDescriptor
    +        .Builder().name("Morphlines ID")
    +        .description("Identifier of the morphlines context")
    +        .required(true)
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .expressionLanguageSupported(true)
    +        .build();
    +
    +    public static final PropertyDescriptor MORPHLINES_FILE = new 
PropertyDescriptor
    +        .Builder().name("Morphlines File")
    +        .description("File for the morphlines context")
    +        .required(true)
    +        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
    +        .expressionLanguageSupported(true)
    +        .build();
    +
    +    public static final PropertyDescriptor MORPHLINES_OUTPUT_FIELD = new 
PropertyDescriptor
    +        .Builder().name("Morphlines output field")
    +        .description("Field name of output in Morphlines. Default is 
'_attachment_body'.")
    +        .required(false)
    +        .defaultValue("_attachment_body")
    +        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +        .build();
    +
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +        .name("success")
    +        .description("Relationship for success.")
    +        .build();
    +
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +        .name("failure")
    +        .description("Relationship for failure of morphlines.")
    +        .build();
    +
    +    public static final Relationship REL_ORIGINAL = new 
Relationship.Builder()
    +        .name("original")
    +        .description("Relationship for original flowfiles.")
    +        .build();
    +
    +    private static final List<PropertyDescriptor> PROPERTIES = 
ImmutableList.<PropertyDescriptor>builder()
    +        .add(MORPHLINES_FILE)
    +        .add(MORPHLINES_ID)
    +        .add(MORPHLINES_OUTPUT_FIELD)
    +        .build();
    +
    +    private static final Set<Relationship> RELATIONSHIPS = 
ImmutableSet.<Relationship>builder()
    +        .add(REL_SUCCESS)
    +        .add(REL_FAILURE)
    +        .build();
    +
    +    public PropertyValue morphlinesFileProperty;
    +    public PropertyValue morphlinesIdProperty;
    +    public PropertyValue morphlinesOutputFieldProperty;
    +    public Map<String, PropertyValue> dynamicPropertyMap = new HashMap();
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return RELATIONSHIPS;
    +    }
    +
    +    @Override
    +    public final List<PropertyDescriptor> 
getSupportedPropertyDescriptors() {
    +        return PROPERTIES;
    +    }
    +
    +    @Override
    +    protected PropertyDescriptor 
getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
    +        return new PropertyDescriptor.Builder()
    +            .required(false)
    +            .name(propertyDescriptorName)
    +            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
    +            .dynamic(true)
    +            .expressionLanguageSupported(true)
    +            .build();
    +    }
    +
    +    @OnScheduled
    +    public void onScheduled(ProcessContext context) throws Exception {
    +        morphlinesFileProperty = context.getProperty(MORPHLINES_FILE);
    +        morphlinesIdProperty = context.getProperty(MORPHLINES_ID);
    +        morphlinesOutputFieldProperty = 
context.getProperty(MORPHLINES_OUTPUT_FIELD);
    +        for (final PropertyDescriptor descriptor : 
context.getProperties().keySet()) {
    +            if (descriptor.isDynamic()) {
    +                dynamicPropertyMap.put(descriptor.getName(), 
context.getProperty(descriptor));
    +            }
    +        }
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) throws ProcessException {
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        FlowFile originalFlowFile = session.clone(flowFile);
    +        final AtomicLong written = new AtomicLong(0L);
    +        final byte[] value = new byte[(int) flowFile.getSize()];
    +
    +        final File morphlinesFile = new 
File(morphlinesFileProperty.evaluateAttributeExpressions(flowFile).getValue());
    +        final String morphlinesId = 
morphlinesIdProperty.evaluateAttributeExpressions(flowFile).getValue();
    +        final String morphlinesOutputField = 
morphlinesOutputFieldProperty.evaluateAttributeExpressions(flowFile).getValue();
    +        Map<String, Object> settings = new HashMap();
    +        for (final String descriptorName : dynamicPropertyMap.keySet()) {
    +            final PropertyValue dynamicPropertyValue = 
dynamicPropertyMap.get(descriptorName);
    +            settings.put(descriptorName, 
dynamicPropertyValue.evaluateAttributeExpressions(flowFile).getValue());
    +        }
    +        final MorphlineContext morphlineContext = new 
MorphlineContext.Builder().setSettings(settings).build();
    +
    +        final Collector collectorRecord = new Collector();
    +        final Command morphline = new Compiler().compile(morphlinesFile, 
morphlinesId, morphlineContext, collectorRecord);
    +
    +        try{
    +            flowFile = session.write(flowFile, new StreamCallback() {
    +                @Override
    +                public void process(InputStream in, OutputStream out) 
throws IOException {
    +                    Record record = new Record();
    +                    StreamUtils.fillBuffer(in, value);
    +                    record.put(Fields.ATTACHMENT_BODY, value);
    +                    Notifications.notifyStartSession(morphline);
    +                    if (morphline.process(record)) {
    +                        List<Record> results = 
collectorRecord.getRecords();
    +                        for (Iterator<Record> it = results.iterator(); 
it.hasNext();) {
    +                            Record result = it.next();
    +                            if 
(result.getFirstValue(morphlinesOutputField) != null) {
    +                                String outputValue = 
it.next().getFirstValue(morphlinesOutputField).toString() + "/n";
    +                                out.write(outputValue.getBytes());
    +                                written.incrementAndGet();
    +                            } else {
    +                                getLogger().warn(String.format("Unable to 
get %s within processed record: %s", morphlinesOutputField, result.toString()));
    +                            }
    +                        }
    +                        Notifications.notifyCommitTransaction(morphline);
    +                    }
    +                }
    +            });
    +
    +            if (written.get() > 0L) {
    +                // false to only update if file transfer is successful
    +                session.adjustCounter("Processed records in morphlines", 
written.get(), false);
    +                session.transfer(flowFile, REL_SUCCESS);
    +                session.transfer(originalFlowFile, REL_ORIGINAL);
    +            } else {
    +                getLogger().warn(String.format("Morphlines transformations 
did not march any of the input records for %s Morphlines ID", morphlinesId));
    +                session.transfer(flowFile, REL_ORIGINAL);
    +                session.transfer(originalFlowFile, REL_FAILURE);
    +            }
    +        } catch (ProcessException e) {
    +            getLogger().error("Error while processing the flowFile through 
Morphlines");
    +            session.transfer(flowFile, REL_ORIGINAL);
    +            session.transfer(originalFlowFile, REL_FAILURE);
    +            Notifications.notifyRollbackTransaction(morphline);
    +            morphlineContext.getExceptionHandler().handleException(e, 
null);
    +        }
    +        Notifications.notifyShutdown(morphline);
    +    }
    +
    +    private static final class Collector implements Command {
    +
    +        private volatile List<Record> results = new ArrayList();
    +
    +        public List<Record> getRecords() {
    +            return results;
    +        }
    +
    +        public void reset() {
    +            results.clear();
    +        }
    +
    +        @Override
    +        public Command getParent() {
    +            return null;
    +        }
    +
    +        @Override
    +        public void notify(Record notification) {
    +        }
    +
    +        @Override
    +        public boolean process(Record record) {
    +            results.add(record);
    +            return true;
    +        }
    +
    +        public int getRecordCount() {
    +            return results.size();
    +        }
    +    }
    +}
    diff --git 
a/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
    new file mode 100644
    index 0000000..65b2511
    --- /dev/null
    +++ 
b/nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
    @@ -0,0 +1 @@
    +org.apache.nifi.processors.morphlines.ImplementMorphlines
    diff --git a/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml 
b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml
    new file mode 100644
    index 0000000..c4a9abb
    --- /dev/null
    +++ b/nifi-nar-bundles/nifi-morphlines-bundle/pom.xml
    @@ -0,0 +1,41 @@
    +<?xml version="1.0" encoding="UTF-8"?>
    +<!--
    +  Licensed to the Apache Software Foundation (ASF) under one or more
    +  contributor license agreements. See the NOTICE file distributed with
    +  this work for additional information regarding copyright ownership.
    +  The ASF licenses this file to You under the Apache License, Version 2.0
    +  (the "License"); you may not use this file except in compliance with
    +  the License. You may obtain a copy of the License at
    +  http://www.apache.org/licenses/LICENSE-2.0
    +  Unless required by applicable law or agreed to in writing, software
    +  distributed under the License is distributed on an "AS IS" BASIS,
    +  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    +  See the License for the specific language governing permissions and
    +  limitations under the License.
    +-->
    +<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
    +    <modelVersion>4.0.0</modelVersion>
    +
    +    <parent>
    +        <groupId>org.apache.nifi</groupId>
    +        <artifactId>nifi-nar-bundles</artifactId>
    +        <version>1.2.0</version>
    +    </parent>
    +
    +    <groupId>org.apache.nifi</groupId>
    +    <artifactId>nifi-morphlines-bundle</artifactId>
    +    <version>1.4.0-SNAPSHOT</version>
    +    <packaging>pom</packaging>
    +
    +    <modules>
    +        <module>nifi-morphlines-processors</module>
    +        <module>nifi-morphlines-nar</module>
    +    </modules>
    +
    +    <properties>
    +        <kite.version>1.1.0</kite.version>
    +        <nifi.version>1.2.0</nifi.version>
    +        <skipNexusStaginDeployMojo>true</skipNexusStaginDeployMojo>
    +    </properties>
    +
    +</project>

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to