[ 
https://issues.apache.org/jira/browse/NIFI-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878690#comment-15878690
 ] 

ASF GitHub Bot commented on NIFI-3147:
--------------------------------------

Github user jfrazee commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1312#discussion_r102510825
  
    --- Diff: 
nifi-nar-bundles/nifi-ccda-bundle/nifi-ccda-processors/src/main/java/org/apache/nifi/processors/ccda/ExtractCCDAAttributes.java
 ---
    @@ -0,0 +1,383 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.nifi.processors.ccda;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Map.Entry;
    +import java.util.Properties;
    +import java.util.Set;
    +import java.util.TreeMap;
    +
    +import org.apache.commons.jexl3.JexlBuilder;
    +import org.apache.commons.jexl3.JexlContext;
    +import org.apache.commons.jexl3.JexlEngine;
    +import org.apache.commons.jexl3.JexlExpression;
    +import org.apache.commons.jexl3.MapContext;
    +import org.apache.commons.lang3.StringUtils;
    +import org.apache.nifi.annotation.behavior.InputRequirement;
    +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
    +import org.apache.nifi.annotation.behavior.SideEffectFree;
    +import org.apache.nifi.annotation.behavior.SupportsBatching;
    +import org.apache.nifi.annotation.documentation.CapabilityDescription;
    +import org.apache.nifi.annotation.documentation.Tags;
    +import org.apache.nifi.components.PropertyDescriptor;
    +import org.apache.nifi.flowfile.FlowFile;
    +import org.apache.nifi.flowfile.attributes.CoreAttributes;
    +import org.apache.nifi.processor.AbstractProcessor;
    +import org.apache.nifi.processor.ProcessContext;
    +import org.apache.nifi.processor.ProcessSession;
    +import org.apache.nifi.processor.ProcessorInitializationContext;
    +import org.apache.nifi.processor.Relationship;
    +import org.apache.nifi.processor.exception.ProcessException;
    +import org.apache.nifi.processor.io.OutputStreamCallback;
    +import org.apache.nifi.processor.util.StandardValidators;
    +import org.eclipse.emf.common.util.Diagnostic;
    +import org.openhealthtools.mdht.uml.cda.CDAPackage;
    +import org.openhealthtools.mdht.uml.cda.ClinicalDocument;
    +import org.openhealthtools.mdht.uml.cda.ccd.CCDPackage;
    +import org.openhealthtools.mdht.uml.cda.consol.ConsolPackage;
    +import org.openhealthtools.mdht.uml.cda.hitsp.HITSPPackage;
    +import org.openhealthtools.mdht.uml.cda.ihe.IHEPackage;
    +import org.openhealthtools.mdht.uml.cda.util.CDAUtil;
    +import org.openhealthtools.mdht.uml.cda.util.CDAUtil.ValidationHandler;
    +
    +import com.google.gson.Gson;
    +import com.google.gson.GsonBuilder;
    +
    +@SideEffectFree
    +@SupportsBatching
    +@InputRequirement(Requirement.INPUT_REQUIRED)
    +@Tags({"CCDA", "healthcare", "extract", "JSON", "attributes"})
    +@CapabilityDescription("Extracts information from an Consolidated CDA 
formatted FlowFile and provides JSON as FlowFile content "
    +        + "and individual attributes as FlowFile attributes. The 
attributes are named as <Parent> <dot> <Key>. "
    +        + "If the Parent is repeating, the naming will be <Parent> 
<underscore> <Parent Index> <dot> <Key>. "
    +        + "For example, section.act_07.observation.name=Essential 
hypertension")
    +public class ExtractCCDAAttributes extends AbstractProcessor {
    +
    +    private static final String APPLICATION_JSON = "application/json";
    +    private static final char FIELD_SEPARATOR = '@';
    +    private static final char KEY_VALUE_SEPARATOR = '#';
    +
    +    private Map<String, Map<String, String>> processMap = new 
LinkedHashMap<String, Map<String, String>>(); // stores mapping data for Parser
    +    private List<String> timingStats = new ArrayList<String>(); // stores 
timing statistics
    +    private Map<String, String> attributes = new TreeMap<String, 
String>(); // stores CDA attributes
    +    private JexlEngine jexl = null; // JEXL Engine to execute code for 
mapping
    +    private JexlContext jexlCtx = null; // JEXL Context to hold element 
being processed
    +
    +    private List<PropertyDescriptor> properties;
    +    private Set<Relationship> relationships;
    +
    +    /**
    +     * SKIP_VALIDATION - Indicates whether to validate the CDA document 
after loading.
    +     * if true and the document is not valid, then ProcessException will 
be thrown
    +     */
    +    public static final PropertyDescriptor SKIP_VALIDATION = new 
PropertyDescriptor.Builder().name("skip-validation")
    +            .displayName("Skip Validation").description("Whether or not to 
validate CDA message values").required(true)
    +            .allowableValues("true", 
"false").defaultValue("true").addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    /**
    +     * PRETTY_PRINTING - Choice of whether generated JSON in the FileFlow 
content is
    +     * pretty printed with new line and tabs or just a continuous string 
of characters
    +     */
    +    public static final PropertyDescriptor PRETTY_PRINTING = new 
PropertyDescriptor.Builder().name("pretty-printing")
    +            .displayName("Pretty Printing").description("Whether or not to 
Pretty Print JSON output").required(true)
    +            .allowableValues("true", 
"false").defaultValue("false").addValidator(StandardValidators.BOOLEAN_VALIDATOR)
    +            .build();
    +
    +    /**
    +     * REL_SUCCESS - Value to be returned in case the processor succeeds
    +     */
    +    public static final Relationship REL_SUCCESS = new 
Relationship.Builder()
    +            .name("success")
    +            .description("A FlowFile is routed to this relationship if it 
is properly parsed as CDA and its content stored in FlowFile")
    +            .build();
    +
    +    /**
    +     * REL_FAILURE - Value to be returned in case the processor fails
    +     */
    +    public static final Relationship REL_FAILURE = new 
Relationship.Builder()
    +            .name("failure")
    +            .description("A FlowFile is routed to this relationship if it 
cannot be parse CDA and store content to FlowFile.")
    +            .build();
    +
    +    @Override
    +    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
    +        return properties;
    +    }
    +
    +    @Override
    +    public Set<Relationship> getRelationships() {
    +        return relationships;
    +    }
    +
    +    @Override
    +    protected void init(final ProcessorInitializationContext context) {
    +        getLogger().info("Loading packages");
    +
    +        relationships = new HashSet<>();
    +        relationships.add(REL_SUCCESS);
    +        relationships.add(REL_FAILURE);
    +
    +        properties = new ArrayList<>();
    +        properties.add(SKIP_VALIDATION);
    +        properties.add(PRETTY_PRINTING);
    +
    +        long start = System.currentTimeMillis();
    +
    +        // Load required MDHT packages
    +        System.setProperty( 
"org.eclipse.emf.ecore.EPackage.Registry.INSTANCE",
    +                "org.eclipse.emf.ecore.impl.EPackageRegistryImpl" );
    +        CDAPackage.eINSTANCE.eClass();
    +        HITSPPackage.eINSTANCE.eClass();
    +        CCDPackage.eINSTANCE.eClass();
    +        ConsolPackage.eINSTANCE.eClass();
    +        IHEPackage.eINSTANCE.eClass();
    +        timingStats.add(String.format("Loaded packages in %d ms", 
System.currentTimeMillis() - start));
    +
    +        // Initialize JEXL
    +        jexl = new 
JexlBuilder().cache(1024).debug(false).silent(true).strict(false).create();
    +        jexlCtx = new MapContext();
    +
    +        getLogger().info("Loading mappings");
    +        loadMappings(); // Load CDA mappings for parser
    +
    +    }
    +
    +    @Override
    +    public void onTrigger(final ProcessContext context, final 
ProcessSession session) {
    +        getLogger().info("Processing CCDA");
    +
    +        FlowFile flowFile = session.get();
    +        if ( flowFile == null ) {
    +            return;
    +        }
    +
    +        if(processMap.isEmpty()) {
    +            getLogger().error("Process Mapping is not loaded");
    +            session.transfer(flowFile, REL_FAILURE);
    +            return;
    +        }
    +
    +        final Boolean skipValidation = 
context.getProperty(SKIP_VALIDATION).asBoolean();
    +        final Boolean prettyPrinting = 
context.getProperty(PRETTY_PRINTING).asBoolean();
    +
    +        long start = System.currentTimeMillis(), total = start;
    +
    +        ClinicalDocument cd = loadDocument(session.read(flowFile), 
skipValidation); // Load and optionally validate CDA document
    +
    +        start = System.currentTimeMillis();
    +        getLogger().info("Processing elements");
    +        Map<String, Object> elements = processElement(null, cd); // 
Process CDA element using mapping data
    +
    +        start = System.currentTimeMillis();
    +        getLogger().info("Generating JSON");
    +
    +        // Initialize GSON with optional pretty printing
    +        Gson gson = prettyPrinting ? new 
GsonBuilder().setPrettyPrinting().create() : new GsonBuilder().create();
    +        String json = gson.toJson(elements);
    +        timingStats.add(String.format("JSON created in %d ms", 
System.currentTimeMillis() - start));
    +        timingStats.add(String.format("CCDA Processed in %d ms", 
System.currentTimeMillis() - total));
    +
    +        logDetails(json);
    +
    +        flowFile = session.write(flowFile, new OutputStreamCallback() {
    +
    +            @Override
    +            public void process(OutputStream output) throws IOException {
    +                output.write(json.getBytes("UTF-8"));
    +            }
    +        });
    +        flowFile = session.putAttribute(flowFile, 
CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
    +        flowFile = session.putAllAttributes(flowFile, attributes);
    +
    +        session.transfer(flowFile, REL_SUCCESS);
    +
    +    }
    +
    +    /**
    +     * Process elements children based on the parser mapping.
    +     * Any String values are added to attributes and JSON map
    +     * For List, the processList method is called to iterate and process
    +     * For an Object this method is called recursively
    +     * While adding to the attributes the key is prefixed by parent
    +     * @param parent    Parent key for this element, used as a prefix for 
attribute key
    +     * @param element   Element to be processed
    +     * @return          Map of processed data, value can contain String or 
Map of Strings
    +     */
    +    protected Map<String, Object> processElement(String parent, Object 
element) {
    +        long start = System.currentTimeMillis();
    +
    +        Map<String, Object> map = new LinkedHashMap<String, Object>();
    +        String name = element.getClass().getName();
    +        Map<String, String> jexlMap = (HashMap<String, String>) 
processMap.get(name); // get JEXL mappings for this element
    +        getLogger().debug("Processing " + name);
    +
    +        if (jexlMap == null) {
    +            getLogger().info("Missing mapping for element " + name);
    +            return null;
    +        }
    +        for (Entry<String, String> entry : jexlMap.entrySet()) { // 
evaluate JEXL for each child element
    +            jexlCtx.set("element", element);
    +            JexlExpression jexlExpr = 
jexl.createExpression(entry.getValue());
    +            Object value = jexlExpr.evaluate(jexlCtx);
    +            String key = entry.getKey();
    +            String prefix = parent != null ? parent + "." + key : key;
    +            addElement(map, prefix, key, value);
    +        }
    +        timingStats.add(String.format("Processed %s in %d ms", name, 
System.currentTimeMillis() - start));
    +        return map;
    +    }
    +
    +    /**
    +     * Adds element to the JSON map and attribute list based on the type
    +     * @param map       JSON map
    +     * @param prefix    parent key as prefix
    +     * @param key       element key
    +     * @param value     element value
    +     */
    +    protected void addElement(Map<String, Object> map, String prefix, 
String key, Object value) {
    +        // if the value is a String, add it to final attribute list
    +        // else process it further until we have a String representation
    +        if (value instanceof String) {
    +            if(value != null && !((String) value).isEmpty()) {
    +                map.put(key, value);
    +                attributes.put(prefix, (String) value);
    +            }
    +        } else if (value instanceof List) {
    +            if(value != null && !((List) value).isEmpty()) {
    +                map.put(key, processList(prefix, (List) value));
    +            }
    +        } else if (value != null) { // process element further
    +            map.put(key, processElement(prefix, value));
    +        }
    +    }
    +
    +    /**
    +     * Iterate through the list and calls processElement to process each 
element
    +     * @param key       key used while calling processElement
    +     * @param value     value is the individual Object being processed
    +     * @return          list of elements
    +     */
    +    protected List<Object> processList(String key, List value) {
    +        List<Object> items = new ArrayList<Object>();
    +        String keyFormat = value.size() > 1 ? "%s_%02d" : "%s";
    +        for (Object item : value) { // iterate over all elements and 
process each element
    +            items.add(processElement(String.format(keyFormat, key, 
items.size() + 1), item));
    +        }
    +        return items;
    +    }
    +
    +    protected ClinicalDocument loadDocument(InputStream inputStream, 
Boolean skipValidation) {
    +        long start = System.currentTimeMillis();
    +        ClinicalDocument cd = null;
    +
    +        try {
    +            getLogger().info("Loading document");
    +            cd = CDAUtil.load(inputStream); // load CDA document
    +            if (!skipValidation && !CDAUtil.validate(cd, new 
CDAValidationHandler())) { //optional validation
    +                getLogger().error("Failed to validate CDA document");
    +                throw new ProcessException("Failed to validate CDA 
document");
    +            }
    +        } catch (Exception e) {
    +            getLogger().error("Failed to load CDA document", e);
    +            throw new ProcessException("Failed to load CDA document", e);
    +        }
    +        timingStats.add(String.format("Loaded document in %d ms", 
System.currentTimeMillis() - start));
    +        return cd;
    +    }
    +
    +    protected void loadMappings() {
    +        long start = System.currentTimeMillis();
    +        ClassLoader classloader = 
Thread.currentThread().getContextClassLoader();
    +        Properties mappings = new Properties();
    +        try (InputStream is = 
classloader.getResourceAsStream("mapping.properties")){
    +            mappings.load(is);
    +            // each child element is key#value and multiple elements are 
separated by @
    +            for (String property : mappings.stringPropertyNames()) {
    +                String[] variables = 
StringUtils.split(mappings.getProperty(property), FIELD_SEPARATOR);
    +                Map<String, String> map = new LinkedHashMap<String, 
String>();
    +                for (String variable : variables) {
    +                    String[] keyvalue = StringUtils.split(variable, 
KEY_VALUE_SEPARATOR);
    +                    map.put(keyvalue[0], keyvalue[1]);
    +                }
    +                processMap.put(property, map);
    +            }
    +
    +        } catch (IOException e) {
    +            getLogger().error("Failed to load mappings", e);
    +            throw new ProcessException("Failed to load mappings", e);
    +        }
    +
    +        timingStats.add(String.format("Loaded mappings in %d ms", 
System.currentTimeMillis() - start));
    +    }
    +
    +    private void logDetails(String json) {
    +
    +        if (getLogger().isTraceEnabled()) {
    +            for (String timing : timingStats) {
    +                getLogger().trace(timing);
    +            }
    +        }
    +
    +        if (getLogger().isDebugEnabled()) {
    +            for (Entry<String, Map<String, String>> entry : 
processMap.entrySet()) {
    +                getLogger().debug(String.format("Mapping: %s=%s", 
entry.getKey(), entry.getValue()));
    +            }
    +            for (Entry<String, String> entry : attributes.entrySet()) {
    +                getLogger().debug(String.format("Attribute: %s=%s", 
entry.getKey(), entry.getValue()));
    +            }
    +            getLogger().debug(json);
    +        }
    +    }
    +
    +    protected class CDAValidationHandler implements ValidationHandler {
    +        @Override
    +        public void handleError(Diagnostic diagnostic) {
    +            getLogger().error(new StringBuilder("ERROR: 
").append(diagnostic.getMessage()).toString());
    +        }
    +
    +        @Override
    +        public void handleWarning(Diagnostic diagnostic) {
    +            getLogger().warn(new StringBuilder("WARNING: 
").append(diagnostic.getMessage()).toString());
    +        }
    +
    +        @Override
    +        public void handleInfo(Diagnostic diagnostic) {
    +            getLogger().info(new StringBuilder("INFO: 
").append(diagnostic.getMessage()).toString());
    +        }
    +    }
    +
    +    @Override
    --- End diff --
    
    Since you're not doing anything different for equals() and hashCode() you 
can just remove these.


> Build processor to parse CCDA into attributes and JSON
> ------------------------------------------------------
>
>                 Key: NIFI-3147
>                 URL: https://issues.apache.org/jira/browse/NIFI-3147
>             Project: Apache NiFi
>          Issue Type: New Feature
>          Components: Extensions
>            Reporter: Kedar Chitale
>              Labels: attributes, ccda, healthcare, json, parser
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Accept a CCDA document, Parse the document to create JSON text and individual 
> attributes for example code.codeSystemName=LOINC



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to