[ https://issues.apache.org/jira/browse/NIFI-3518?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368459#comment-16368459 ]
ASF GitHub Bot commented on NIFI-3518: -------------------------------------- Github user binhnv commented on a diff in the pull request: https://github.com/apache/nifi/pull/2028#discussion_r168942406 --- Diff: nifi-nar-bundles/nifi-morphlines-bundle/nifi-morphlines-processors/src/main/java/org/apache/nifi/processors/morphlines/ExecuteMorphline.java --- @@ -0,0 +1,253 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.morphlines; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.behavior.DynamicProperty; +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; +import org.kitesdk.morphline.api.Command; +import org.kitesdk.morphline.api.MorphlineContext; +import org.kitesdk.morphline.api.Record; +import org.kitesdk.morphline.base.Fields; +import org.kitesdk.morphline.base.Compiler; +import org.kitesdk.morphline.base.Notifications; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Set; +import java.util.Iterator; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicLong; + +@Tags({"kitesdk", "morphlines", "ETL", "HDFS", "avro", "Solr", "HBase"}) +@CapabilityDescription("Executes Morphlines (http://kitesdk.org/docs/1.1.0/morphlines/) framework, which performs in-memory container of transformation " + + "commands in oder to perform tasks such as loading, parsing, transforming, or otherwise processing a single record.") +@DynamicProperty(name = "Relationship Name", value = "A Regular Expression", supportsExpressionLanguage = true, description = "Adds the dynamic property key and value " + + "as key-value pair to Morphlines content.") +@Restricted("Provides operator the ability to read/write to any file that NiFi has access to.") + +public class ExecuteMorphline extends AbstractProcessor { + public static final PropertyDescriptor MORPHLINES_ID = new PropertyDescriptor + .Builder().name("Morphlines ID") + .description("Identifier of the morphlines context") + .required(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MORPHLINES_FILE = new PropertyDescriptor + .Builder().name("Morphlines File") + .description("File for the morphlines context") + .required(true) + .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR) + .expressionLanguageSupported(true) + .build(); + + public static final PropertyDescriptor MORPHLINES_OUTPUT_FIELD = new PropertyDescriptor + .Builder().name("Morphlines output field") + .description("Field name of output in Morphlines. Default is '_attachment_body'.") + .required(false) + .defaultValue("_attachment_body") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("Relationship for success.") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Relationship for failure of morphlines.") + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("Relationship for original flowfiles.") + .build(); + + private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.<PropertyDescriptor>builder() + .add(MORPHLINES_FILE) + .add(MORPHLINES_ID) + .add(MORPHLINES_OUTPUT_FIELD) + .build(); + + private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship>builder() + .add(REL_SUCCESS) + .add(REL_FAILURE) + .build(); + + public PropertyValue morphlinesFileProperty; + public PropertyValue morphlinesIdProperty; + public PropertyValue morphlinesOutputFieldProperty; + public Map<String, PropertyValue> dynamicPropertyMap = new HashMap(); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return PROPERTIES; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .required(false) + .name(propertyDescriptorName) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .dynamic(true) + .expressionLanguageSupported(true) + .build(); + } + + @OnScheduled + public void onScheduled(ProcessContext context) throws Exception { + morphlinesFileProperty = context.getProperty(MORPHLINES_FILE); + morphlinesIdProperty = context.getProperty(MORPHLINES_ID); + morphlinesOutputFieldProperty = context.getProperty(MORPHLINES_OUTPUT_FIELD); + for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { + if (descriptor.isDynamic()) { + dynamicPropertyMap.put(descriptor.getName(), context.getProperty(descriptor)); + } + } + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if ( flowFile == null ) { + return; + } + + FlowFile originalFlowFile = session.clone(flowFile); + final AtomicLong written = new AtomicLong(0L); + final byte[] value = new byte[(int) flowFile.getSize()]; --- End diff -- This is not going to work for big file > Create a Morphlines processor > ----------------------------- > > Key: NIFI-3518 > URL: https://issues.apache.org/jira/browse/NIFI-3518 > Project: Apache NiFi > Issue Type: New Feature > Reporter: William Nouet > Priority: Minor > Attachments: NIFI-3518-versionupdates.patch > > > Create a dedicate processor to run Morphlines transformations > (http://kitesdk.org/docs/1.1.0/morphlines/morphlines-reference-guide.html) -- This message was sent by Atlassian JIRA (v7.6.3#76005)