[
https://issues.apache.org/jira/browse/NIFI-3147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15878690#comment-15878690
]
ASF GitHub Bot commented on NIFI-3147:
--------------------------------------
Github user jfrazee commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1312#discussion_r102510825
--- Diff:
nifi-nar-bundles/nifi-ccda-bundle/nifi-ccda-processors/src/main/java/org/apache/nifi/processors/ccda/ExtractCCDAAttributes.java
---
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.ccda;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.apache.commons.jexl3.JexlBuilder;
+import org.apache.commons.jexl3.JexlContext;
+import org.apache.commons.jexl3.JexlEngine;
+import org.apache.commons.jexl3.JexlExpression;
+import org.apache.commons.jexl3.MapContext;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.eclipse.emf.common.util.Diagnostic;
+import org.openhealthtools.mdht.uml.cda.CDAPackage;
+import org.openhealthtools.mdht.uml.cda.ClinicalDocument;
+import org.openhealthtools.mdht.uml.cda.ccd.CCDPackage;
+import org.openhealthtools.mdht.uml.cda.consol.ConsolPackage;
+import org.openhealthtools.mdht.uml.cda.hitsp.HITSPPackage;
+import org.openhealthtools.mdht.uml.cda.ihe.IHEPackage;
+import org.openhealthtools.mdht.uml.cda.util.CDAUtil;
+import org.openhealthtools.mdht.uml.cda.util.CDAUtil.ValidationHandler;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"CCDA", "healthcare", "extract", "JSON", "attributes"})
+@CapabilityDescription("Extracts information from an Consolidated CDA
formatted FlowFile and provides JSON as FlowFile content "
+ + "and individual attributes as FlowFile attributes. The
attributes are named as <Parent> <dot> <Key>. "
+ + "If the Parent is repeating, the naming will be <Parent>
<underscore> <Parent Index> <dot> <Key>. "
+ + "For example, section.act_07.observation.name=Essential
hypertension")
+public class ExtractCCDAAttributes extends AbstractProcessor {
+
+ private static final String APPLICATION_JSON = "application/json";
+ private static final char FIELD_SEPARATOR = '@';
+ private static final char KEY_VALUE_SEPARATOR = '#';
+
+ private Map<String, Map<String, String>> processMap = new
LinkedHashMap<String, Map<String, String>>(); // stores mapping data for Parser
+ private List<String> timingStats = new ArrayList<String>(); // stores
timing statistics
+ private Map<String, String> attributes = new TreeMap<String,
String>(); // stores CDA attributes
+ private JexlEngine jexl = null; // JEXL Engine to execute code for
mapping
+ private JexlContext jexlCtx = null; // JEXL Context to hold element
being processed
+
+ private List<PropertyDescriptor> properties;
+ private Set<Relationship> relationships;
+
+ /**
+ * SKIP_VALIDATION - Indicates whether to validate the CDA document
after loading.
+ * if true and the document is not valid, then ProcessException will
be thrown
+ */
+ public static final PropertyDescriptor SKIP_VALIDATION = new
PropertyDescriptor.Builder().name("skip-validation")
+ .displayName("Skip Validation").description("Whether or not to
validate CDA message values").required(true)
+ .allowableValues("true",
"false").defaultValue("true").addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ /**
+ * PRETTY_PRINTING - Choice of whether generated JSON in the FileFlow
content is
+ * pretty printed with new line and tabs or just a continuous string
of characters
+ */
+ public static final PropertyDescriptor PRETTY_PRINTING = new
PropertyDescriptor.Builder().name("pretty-printing")
+ .displayName("Pretty Printing").description("Whether or not to
Pretty Print JSON output").required(true)
+ .allowableValues("true",
"false").defaultValue("false").addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ /**
+ * REL_SUCCESS - Value to be returned in case the processor succeeds
+ */
+ public static final Relationship REL_SUCCESS = new
Relationship.Builder()
+ .name("success")
+ .description("A FlowFile is routed to this relationship if it
is properly parsed as CDA and its content stored in FlowFile")
+ .build();
+
+ /**
+ * REL_FAILURE - Value to be returned in case the processor fails
+ */
+ public static final Relationship REL_FAILURE = new
Relationship.Builder()
+ .name("failure")
+ .description("A FlowFile is routed to this relationship if it
cannot be parse CDA and store content to FlowFile.")
+ .build();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return relationships;
+ }
+
+ @Override
+ protected void init(final ProcessorInitializationContext context) {
+ getLogger().info("Loading packages");
+
+ relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+
+ properties = new ArrayList<>();
+ properties.add(SKIP_VALIDATION);
+ properties.add(PRETTY_PRINTING);
+
+ long start = System.currentTimeMillis();
+
+ // Load required MDHT packages
+ System.setProperty(
"org.eclipse.emf.ecore.EPackage.Registry.INSTANCE",
+ "org.eclipse.emf.ecore.impl.EPackageRegistryImpl" );
+ CDAPackage.eINSTANCE.eClass();
+ HITSPPackage.eINSTANCE.eClass();
+ CCDPackage.eINSTANCE.eClass();
+ ConsolPackage.eINSTANCE.eClass();
+ IHEPackage.eINSTANCE.eClass();
+ timingStats.add(String.format("Loaded packages in %d ms",
System.currentTimeMillis() - start));
+
+ // Initialize JEXL
+ jexl = new
JexlBuilder().cache(1024).debug(false).silent(true).strict(false).create();
+ jexlCtx = new MapContext();
+
+ getLogger().info("Loading mappings");
+ loadMappings(); // Load CDA mappings for parser
+
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final
ProcessSession session) {
+ getLogger().info("Processing CCDA");
+
+ FlowFile flowFile = session.get();
+ if ( flowFile == null ) {
+ return;
+ }
+
+ if(processMap.isEmpty()) {
+ getLogger().error("Process Mapping is not loaded");
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final Boolean skipValidation =
context.getProperty(SKIP_VALIDATION).asBoolean();
+ final Boolean prettyPrinting =
context.getProperty(PRETTY_PRINTING).asBoolean();
+
+ long start = System.currentTimeMillis(), total = start;
+
+ ClinicalDocument cd = loadDocument(session.read(flowFile),
skipValidation); // Load and optionally validate CDA document
+
+ start = System.currentTimeMillis();
+ getLogger().info("Processing elements");
+ Map<String, Object> elements = processElement(null, cd); //
Process CDA element using mapping data
+
+ start = System.currentTimeMillis();
+ getLogger().info("Generating JSON");
+
+ // Initialize GSON with optional pretty printing
+ Gson gson = prettyPrinting ? new
GsonBuilder().setPrettyPrinting().create() : new GsonBuilder().create();
+ String json = gson.toJson(elements);
+ timingStats.add(String.format("JSON created in %d ms",
System.currentTimeMillis() - start));
+ timingStats.add(String.format("CCDA Processed in %d ms",
System.currentTimeMillis() - total));
+
+ logDetails(json);
+
+ flowFile = session.write(flowFile, new OutputStreamCallback() {
+
+ @Override
+ public void process(OutputStream output) throws IOException {
+ output.write(json.getBytes("UTF-8"));
+ }
+ });
+ flowFile = session.putAttribute(flowFile,
CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ session.transfer(flowFile, REL_SUCCESS);
+
+ }
+
+ /**
+ * Process elements children based on the parser mapping.
+ * Any String values are added to attributes and JSON map
+ * For List, the processList method is called to iterate and process
+ * For an Object this method is called recursively
+ * While adding to the attributes the key is prefixed by parent
+ * @param parent Parent key for this element, used as a prefix for
attribute key
+ * @param element Element to be processed
+ * @return Map of processed data, value can contain String or
Map of Strings
+ */
+ protected Map<String, Object> processElement(String parent, Object
element) {
+ long start = System.currentTimeMillis();
+
+ Map<String, Object> map = new LinkedHashMap<String, Object>();
+ String name = element.getClass().getName();
+ Map<String, String> jexlMap = (HashMap<String, String>)
processMap.get(name); // get JEXL mappings for this element
+ getLogger().debug("Processing " + name);
+
+ if (jexlMap == null) {
+ getLogger().info("Missing mapping for element " + name);
+ return null;
+ }
+ for (Entry<String, String> entry : jexlMap.entrySet()) { //
evaluate JEXL for each child element
+ jexlCtx.set("element", element);
+ JexlExpression jexlExpr =
jexl.createExpression(entry.getValue());
+ Object value = jexlExpr.evaluate(jexlCtx);
+ String key = entry.getKey();
+ String prefix = parent != null ? parent + "." + key : key;
+ addElement(map, prefix, key, value);
+ }
+ timingStats.add(String.format("Processed %s in %d ms", name,
System.currentTimeMillis() - start));
+ return map;
+ }
+
+ /**
+ * Adds element to the JSON map and attribute list based on the type
+ * @param map JSON map
+ * @param prefix parent key as prefix
+ * @param key element key
+ * @param value element value
+ */
+ protected void addElement(Map<String, Object> map, String prefix,
String key, Object value) {
+ // if the value is a String, add it to final attribute list
+ // else process it further until we have a String representation
+ if (value instanceof String) {
+ if(value != null && !((String) value).isEmpty()) {
+ map.put(key, value);
+ attributes.put(prefix, (String) value);
+ }
+ } else if (value instanceof List) {
+ if(value != null && !((List) value).isEmpty()) {
+ map.put(key, processList(prefix, (List) value));
+ }
+ } else if (value != null) { // process element further
+ map.put(key, processElement(prefix, value));
+ }
+ }
+
+ /**
+ * Iterate through the list and calls processElement to process each
element
+ * @param key key used while calling processElement
+ * @param value value is the individual Object being processed
+ * @return list of elements
+ */
+ protected List<Object> processList(String key, List value) {
+ List<Object> items = new ArrayList<Object>();
+ String keyFormat = value.size() > 1 ? "%s_%02d" : "%s";
+ for (Object item : value) { // iterate over all elements and
process each element
+ items.add(processElement(String.format(keyFormat, key,
items.size() + 1), item));
+ }
+ return items;
+ }
+
+ protected ClinicalDocument loadDocument(InputStream inputStream,
Boolean skipValidation) {
+ long start = System.currentTimeMillis();
+ ClinicalDocument cd = null;
+
+ try {
+ getLogger().info("Loading document");
+ cd = CDAUtil.load(inputStream); // load CDA document
+ if (!skipValidation && !CDAUtil.validate(cd, new
CDAValidationHandler())) { //optional validation
+ getLogger().error("Failed to validate CDA document");
+ throw new ProcessException("Failed to validate CDA
document");
+ }
+ } catch (Exception e) {
+ getLogger().error("Failed to load CDA document", e);
+ throw new ProcessException("Failed to load CDA document", e);
+ }
+ timingStats.add(String.format("Loaded document in %d ms",
System.currentTimeMillis() - start));
+ return cd;
+ }
+
+ protected void loadMappings() {
+ long start = System.currentTimeMillis();
+ ClassLoader classloader =
Thread.currentThread().getContextClassLoader();
+ Properties mappings = new Properties();
+ try (InputStream is =
classloader.getResourceAsStream("mapping.properties")){
+ mappings.load(is);
+ // each child element is key#value and multiple elements are
separated by @
+ for (String property : mappings.stringPropertyNames()) {
+ String[] variables =
StringUtils.split(mappings.getProperty(property), FIELD_SEPARATOR);
+ Map<String, String> map = new LinkedHashMap<String,
String>();
+ for (String variable : variables) {
+ String[] keyvalue = StringUtils.split(variable,
KEY_VALUE_SEPARATOR);
+ map.put(keyvalue[0], keyvalue[1]);
+ }
+ processMap.put(property, map);
+ }
+
+ } catch (IOException e) {
+ getLogger().error("Failed to load mappings", e);
+ throw new ProcessException("Failed to load mappings", e);
+ }
+
+ timingStats.add(String.format("Loaded mappings in %d ms",
System.currentTimeMillis() - start));
+ }
+
+ private void logDetails(String json) {
+
+ if (getLogger().isTraceEnabled()) {
+ for (String timing : timingStats) {
+ getLogger().trace(timing);
+ }
+ }
+
+ if (getLogger().isDebugEnabled()) {
+ for (Entry<String, Map<String, String>> entry :
processMap.entrySet()) {
+ getLogger().debug(String.format("Mapping: %s=%s",
entry.getKey(), entry.getValue()));
+ }
+ for (Entry<String, String> entry : attributes.entrySet()) {
+ getLogger().debug(String.format("Attribute: %s=%s",
entry.getKey(), entry.getValue()));
+ }
+ getLogger().debug(json);
+ }
+ }
+
+ protected class CDAValidationHandler implements ValidationHandler {
+ @Override
+ public void handleError(Diagnostic diagnostic) {
+ getLogger().error(new StringBuilder("ERROR:
").append(diagnostic.getMessage()).toString());
+ }
+
+ @Override
+ public void handleWarning(Diagnostic diagnostic) {
+ getLogger().warn(new StringBuilder("WARNING:
").append(diagnostic.getMessage()).toString());
+ }
+
+ @Override
+ public void handleInfo(Diagnostic diagnostic) {
+ getLogger().info(new StringBuilder("INFO:
").append(diagnostic.getMessage()).toString());
+ }
+ }
+
+ @Override
--- End diff --
Since you're not doing anything different for equals() and hashCode() you
can just remove these.
> Build processor to parse CCDA into attributes and JSON
> ------------------------------------------------------
>
> Key: NIFI-3147
> URL: https://issues.apache.org/jira/browse/NIFI-3147
> Project: Apache NiFi
> Issue Type: New Feature
> Components: Extensions
> Reporter: Kedar Chitale
> Labels: attributes, ccda, healthcare, json, parser
> Original Estimate: 336h
> Remaining Estimate: 336h
>
> Accept a CCDA document, Parse the document to create JSON text and individual
> attributes for example code.codeSystemName=LOINC
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)