Repository: incubator-nifi Updated Branches: refs/heads/develop c14b2cebf -> 5a559e455
NIFI-271 Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/5a559e45 Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/5a559e45 Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/5a559e45 Branch: refs/heads/develop Commit: 5a559e455761666fc3e1680a3dad934b58d687f2 Parents: c14b2ce Author: joewitt <[email protected]> Authored: Sat Apr 25 08:38:28 2015 -0400 Committer: joewitt <[email protected]> Committed: Sat Apr 25 08:38:28 2015 -0400 ---------------------------------------------------------------------- .../nifi-hl7-bundle/nifi-hl7-processors/pom.xml | 114 +++--- .../processors/hl7/ExtractHL7Attributes.java | 365 +++++++++---------- .../apache/nifi/processors/hl7/RouteHL7.java | 300 +++++++-------- .../hl7/TestExtractHL7Attributes.java | 30 +- 4 files changed, 405 insertions(+), 404 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5a559e45/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/pom.xml index 5f29cd8..f266835 100644 --- a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/pom.xml @@ -25,19 +25,19 @@ <artifactId>nifi-hl7-processors</artifactId> <packaging>jar</packaging> - <build> - <plugins> - <plugin> - <groupId>org.apache.rat</groupId> - <artifactId>apache-rat-plugin</artifactId> - <configuration> - <excludes> - <exclude>src/test/resources/hypoglycemia.hl7</exclude> - </excludes> - </configuration> - </plugin> - </plugins> - </build> + <build> + <plugins> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <configuration> + <excludes> + <exclude>src/test/resources/hypoglycemia.hl7</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> <dependencies> <dependency> @@ -50,56 +50,56 @@ </dependency> <dependency> - <groupId>org.apache.nifi</groupId> - <artifactId>nifi-hl7-query-language</artifactId> - <version>0.1.0-incubating-SNAPSHOT</version> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-hl7-query-language</artifactId> + <version>0.1.0-incubating-SNAPSHOT</version> </dependency> - <dependency> - <groupId>ca.uhn.hapi</groupId> - <artifactId>hapi-base</artifactId> - <version>2.2</version> - </dependency> <dependency> - <groupId>ca.uhn.hapi</groupId> - <artifactId>hapi-structures-v21</artifactId> - <version>2.2</version> - </dependency> - <dependency> - <groupId>ca.uhn.hapi</groupId> - <artifactId>hapi-structures-v22</artifactId> - <version>2.2</version> - </dependency> + <groupId>ca.uhn.hapi</groupId> + <artifactId>hapi-base</artifactId> + <version>2.2</version> + </dependency> + <dependency> + <groupId>ca.uhn.hapi</groupId> + <artifactId>hapi-structures-v21</artifactId> + <version>2.2</version> + </dependency> + <dependency> + <groupId>ca.uhn.hapi</groupId> + <artifactId>hapi-structures-v22</artifactId> + <version>2.2</version> + </dependency> + <dependency> + <groupId>ca.uhn.hapi</groupId> + <artifactId>hapi-structures-v23</artifactId> + <version>2.2</version> + </dependency> + <dependency> + <groupId>ca.uhn.hapi</groupId> + <artifactId>hapi-structures-v231</artifactId> + <version>2.2</version> + </dependency> <dependency> - <groupId>ca.uhn.hapi</groupId> - <artifactId>hapi-structures-v23</artifactId> - <version>2.2</version> - </dependency> - <dependency> - <groupId>ca.uhn.hapi</groupId> - <artifactId>hapi-structures-v231</artifactId> - <version>2.2</version> - </dependency> + <groupId>ca.uhn.hapi</groupId> + <artifactId>hapi-structures-v24</artifactId> + <version>2.2</version> + </dependency> <dependency> - <groupId>ca.uhn.hapi</groupId> - <artifactId>hapi-structures-v24</artifactId> - <version>2.2</version> - </dependency> - <dependency> - <groupId>ca.uhn.hapi</groupId> - <artifactId>hapi-structures-v25</artifactId> - <version>2.2</version> - </dependency> - <dependency> - <groupId>ca.uhn.hapi</groupId> - <artifactId>hapi-structures-v251</artifactId> - <version>2.2</version> - </dependency> + <groupId>ca.uhn.hapi</groupId> + <artifactId>hapi-structures-v25</artifactId> + <version>2.2</version> + </dependency> <dependency> - <groupId>ca.uhn.hapi</groupId> - <artifactId>hapi-structures-v26</artifactId> - <version>2.2</version> - </dependency> + <groupId>ca.uhn.hapi</groupId> + <artifactId>hapi-structures-v251</artifactId> + <version>2.2</version> + </dependency> + <dependency> + <groupId>ca.uhn.hapi</groupId> + <artifactId>hapi-structures-v26</artifactId> + <version>2.2</version> + </dependency> <dependency> <groupId>org.apache.nifi</groupId> http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5a559e45/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java index e4a0d53..574fb2d 100644 --- a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java +++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java @@ -55,193 +55,190 @@ import ca.uhn.hl7v2.model.Varies; import ca.uhn.hl7v2.parser.PipeParser; import ca.uhn.hl7v2.validation.impl.ValidationContextFactory; - @SideEffectFree @SupportsBatching @Tags({"HL7", "health level 7", "healthcare", "extract", "attributes"}) @CapabilityDescription("Extracts information from an HL7 (Health Level 7) formatted FlowFile and adds the information as FlowFile Attributes. " - + "The attributes are named as <Segment Name> <dot> <Field Index>. If the segment is repeating, the naming will be " - + "<Segment Name> <underscore> <Segment Index> <dot> <Field Index>. For example, we may have an attribute named \"MHS.12\" with " - + "a value of \"2.1\" and an attribute named \"OBX_11.3\" with a value of \"93000^CPT4\".") + + "The attributes are named as <Segment Name> <dot> <Field Index>. If the segment is repeating, the naming will be " + + "<Segment Name> <underscore> <Segment Index> <dot> <Field Index>. For example, we may have an attribute named \"MHS.12\" with " + + "a value of \"2.1\" and an attribute named \"OBX_11.3\" with a value of \"93000^CPT4\".") public class ExtractHL7Attributes extends AbstractProcessor { - public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() - .name("Character Encoding") - .description("The Character Encoding that is used to encode the HL7 data") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .defaultValue("UTF-8") - .build(); - - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("A FlowFile is routed to this relationship if it is properly parsed as HL7 and its attributes extracted") - .build(); - - public static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("A FlowFile is routed to this relationship if it cannot be mapped to FlowFile Attributes. This would happen if the FlowFile does not contain valid HL7 data") - .build(); - - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(CHARACTER_SET); - return properties; - } - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - return relationships; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if ( flowFile == null ) { - return; - } - - final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue()); - - final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer); - } - }); - - @SuppressWarnings("resource") - final HapiContext hapiContext = new DefaultHapiContext(); - hapiContext.setValidationContext(ValidationContextFactory.noValidation()); - - final PipeParser parser = hapiContext.getPipeParser(); - final String hl7Text = new String(buffer, charset); - final Message message; - try { - message = parser.parse(hl7Text); - final Group group = message.getParent(); - - final Map<String, String> attributes = new HashMap<>(); - extractAttributes(group, attributes); - flowFile = session.putAllAttributes(flowFile, attributes); - getLogger().info("Successfully extracted {} attributes for {}; routing to success", new Object[] {attributes.size(), flowFile}); - getLogger().debug("Added the following attributes for {}: {}", new Object[] {flowFile, attributes}); - session.transfer(flowFile, REL_SUCCESS); - } catch (final HL7Exception e) { - getLogger().error("Failed to extract attributes from {} due to {}", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - } - - private void extractAttributes(final Group group, final Map<String, String> attributes) throws HL7Exception { - extractAttributes(group, attributes, new HashMap<String, Integer>()); - } - - private void extractAttributes(final Group group, final Map<String, String> attributes, final Map<String, Integer> segmentCounts) throws HL7Exception { - if ( group.isEmpty() ) { - return; - } - - final String[] structureNames = group.getNames(); - for ( final String structName : structureNames ) { - final Structure[] subStructures = group.getAll(structName); - - if ( group.isGroup(structName) ) { - for ( final Structure subStructure : subStructures ) { - final Group subGroup = (Group) subStructure; - extractAttributes(subGroup, attributes, segmentCounts); - } - } else { - for ( final Structure structure : subStructures ) { - final Segment segment = (Segment) structure ; - - final String segmentName = segment.getName(); - Integer segmentNum = segmentCounts.get(segmentName); - if (segmentNum == null) { - segmentNum = 1; - segmentCounts.put(segmentName, 1); - } else { - segmentNum++; - segmentCounts.put(segmentName, segmentNum); - } - - final boolean segmentRepeating = segment.getParent().isRepeating(segment.getName()); - final boolean parentRepeating = (segment.getParent().getParent() != segment.getParent() && segment.getParent().getParent().isRepeating(segment.getParent().getName())); - final boolean useSegmentIndex = segmentRepeating || parentRepeating; - - final Map<String, String> attributeMap = getAttributes(segment, useSegmentIndex ? segmentNum : null); - attributes.putAll(attributeMap); - } - } - } - } - - - private Map<String, String> getAttributes(final Segment segment, final Integer segmentNum) throws HL7Exception { - final Map<String, String> attributes = new HashMap<>(); - - for (int i=1; i <= segment.numFields(); i++) { - final String fieldName = segment.getName() + (segmentNum == null ? "" : "_" + segmentNum) + "." + i; - final Type[] types = segment.getField(i); - final StringBuilder sb = new StringBuilder(); - for ( final Type type : types ) { - final String typeValue = getValue(type); - if ( !typeValue.isEmpty() ) { - sb.append(typeValue).append("^"); - } - } - - if ( sb.length() == 0 ) { - continue; - } - String typeVal = sb.toString(); - if ( typeVal.endsWith("^") ) { - typeVal = typeVal.substring(0, typeVal.length() - 1); - } - - attributes.put(fieldName, typeVal); - } - - return attributes; - } - - - private String getValue(final Type type) { - if ( type == null ) { - return ""; - } - - if ( type instanceof Primitive ) { - final String value = ((Primitive) type).getValue(); - return value == null ? "" : value; - } else if ( type instanceof Composite ) { - final StringBuilder sb = new StringBuilder(); - final Composite composite = (Composite) type; - for ( final Type component : composite.getComponents() ) { - final String componentValue = getValue(component); - if ( !componentValue.isEmpty() ) { - sb.append(componentValue).append("^"); - } - } - - final String value = sb.toString(); - if ( value.endsWith("^") ) { - return value.substring(0, value.length() - 1); - } - - return value; - } else if ( type instanceof Varies ) { - final Varies varies = (Varies) type; - return getValue(varies.getData()); - } - - return ""; - } + + public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() + .name("Character Encoding") + .description("The Character Encoding that is used to encode the HL7 data") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .build(); + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("A FlowFile is routed to this relationship if it is properly parsed as HL7 and its attributes extracted") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("A FlowFile is routed to this relationship if it cannot be mapped to FlowFile Attributes. This would happen if the FlowFile does not contain valid HL7 data") + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(CHARACTER_SET); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue()); + + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + @SuppressWarnings("resource") + final HapiContext hapiContext = new DefaultHapiContext(); + hapiContext.setValidationContext(ValidationContextFactory.noValidation()); + + final PipeParser parser = hapiContext.getPipeParser(); + final String hl7Text = new String(buffer, charset); + final Message message; + try { + message = parser.parse(hl7Text); + final Group group = message.getParent(); + + final Map<String, String> attributes = new HashMap<>(); + extractAttributes(group, attributes); + flowFile = session.putAllAttributes(flowFile, attributes); + getLogger().info("Successfully extracted {} attributes for {}; routing to success", new Object[]{attributes.size(), flowFile}); + getLogger().debug("Added the following attributes for {}: {}", new Object[]{flowFile, attributes}); + session.transfer(flowFile, REL_SUCCESS); + } catch (final HL7Exception e) { + getLogger().error("Failed to extract attributes from {} due to {}", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + } + + private void extractAttributes(final Group group, final Map<String, String> attributes) throws HL7Exception { + extractAttributes(group, attributes, new HashMap<String, Integer>()); + } + + private void extractAttributes(final Group group, final Map<String, String> attributes, final Map<String, Integer> segmentCounts) throws HL7Exception { + if (group.isEmpty()) { + return; + } + + final String[] structureNames = group.getNames(); + for (final String structName : structureNames) { + final Structure[] subStructures = group.getAll(structName); + + if (group.isGroup(structName)) { + for (final Structure subStructure : subStructures) { + final Group subGroup = (Group) subStructure; + extractAttributes(subGroup, attributes, segmentCounts); + } + } else { + for (final Structure structure : subStructures) { + final Segment segment = (Segment) structure; + + final String segmentName = segment.getName(); + Integer segmentNum = segmentCounts.get(segmentName); + if (segmentNum == null) { + segmentNum = 1; + segmentCounts.put(segmentName, 1); + } else { + segmentNum++; + segmentCounts.put(segmentName, segmentNum); + } + + final boolean segmentRepeating = segment.getParent().isRepeating(segment.getName()); + final boolean parentRepeating = (segment.getParent().getParent() != segment.getParent() && segment.getParent().getParent().isRepeating(segment.getParent().getName())); + final boolean useSegmentIndex = segmentRepeating || parentRepeating; + + final Map<String, String> attributeMap = getAttributes(segment, useSegmentIndex ? segmentNum : null); + attributes.putAll(attributeMap); + } + } + } + } + + private Map<String, String> getAttributes(final Segment segment, final Integer segmentNum) throws HL7Exception { + final Map<String, String> attributes = new HashMap<>(); + + for (int i = 1; i <= segment.numFields(); i++) { + final String fieldName = segment.getName() + (segmentNum == null ? "" : "_" + segmentNum) + "." + i; + final Type[] types = segment.getField(i); + final StringBuilder sb = new StringBuilder(); + for (final Type type : types) { + final String typeValue = getValue(type); + if (!typeValue.isEmpty()) { + sb.append(typeValue).append("^"); + } + } + + if (sb.length() == 0) { + continue; + } + String typeVal = sb.toString(); + if (typeVal.endsWith("^")) { + typeVal = typeVal.substring(0, typeVal.length() - 1); + } + + attributes.put(fieldName, typeVal); + } + + return attributes; + } + + private String getValue(final Type type) { + if (type == null) { + return ""; + } + + if (type instanceof Primitive) { + final String value = ((Primitive) type).getValue(); + return value == null ? "" : value; + } else if (type instanceof Composite) { + final StringBuilder sb = new StringBuilder(); + final Composite composite = (Composite) type; + for (final Type component : composite.getComponents()) { + final String componentValue = getValue(component); + if (!componentValue.isEmpty()) { + sb.append(componentValue).append("^"); + } + } + + final String value = sb.toString(); + if (value.endsWith("^")) { + return value.substring(0, value.length() - 1); + } + + return value; + } else if (type instanceof Varies) { + final Varies varies = (Varies) type; + return getValue(varies.getData()); + } + + return ""; + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5a559e45/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java index c8c4176..53e7e69 100644 --- a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java +++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java @@ -64,154 +64,158 @@ import ca.uhn.hl7v2.validation.impl.ValidationContextFactory; @SideEffectFree @SupportsBatching @Tags({"HL7", "healthcare", "route", "Health Level 7"}) -@DynamicProperties({@DynamicProperty(name="Name of a Relationship", value="An HL7 Query Language query", description="If a FlowFile matches the query, it will be routed to a relationship with the name of the property")}) -@WritesAttributes({@WritesAttribute(attribute="RouteHL7.Route", description="The name of the relationship to which the FlowFile was routed")}) +@DynamicProperties({ + @DynamicProperty(name = "Name of a Relationship", value = "An HL7 Query Language query", + description = "If a FlowFile matches the query, it will be routed to a relationship with the name of the property")}) +@WritesAttributes({ + @WritesAttribute(attribute = "RouteHL7.Route", description = "The name of the relationship to which the FlowFile was routed")}) @CapabilityDescription("Routes incoming HL7 data according to user-defined queries. To add a query, add a new property to the processor." - + " The name of the property will become a new relationship for the processor, and the value is an HL7 Query Language query. If" - + " a FlowFile matches the query, a copy of the FlowFile will be routed to the associated relationship.") + + " The name of the property will become a new relationship for the processor, and the value is an HL7 Query Language query. If" + + " a FlowFile matches the query, a copy of the FlowFile will be routed to the associated relationship.") public class RouteHL7 extends AbstractProcessor { - public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() - .name("Character Encoding") - .description("The Character Encoding that is used to encode the HL7 data") - .required(true) - .expressionLanguageSupported(true) - .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) - .defaultValue("UTF-8") - .build(); - - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("Any FlowFile that cannot be parsed as HL7 will be routed to this relationship") - .build(); - static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("The original FlowFile that comes into this processor will be routed to this relationship, unless it is routed to 'failure'") - .build(); - - private volatile Map<Relationship, HL7Query> queries = new HashMap<>(); - - @Override - protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .description("Specifies a query that will cause any HL7 message matching the query to be routed to the '" + propertyDescriptorName + "' relationship") - .required(false) - .dynamic(true) - .addValidator(new HL7QueryValidator()) - .build(); - } - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(CHARACTER_SET); - return properties; - } - - @Override - public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - if ( !descriptor.isDynamic() ) { - return; - } - - final Map<Relationship, HL7Query> updatedQueryMap = new HashMap<>(queries); - final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build(); - - if ( newValue == null ) { - updatedQueryMap.remove(relationship); - } else { - final HL7Query query = HL7Query.compile(newValue); - updatedQueryMap.put(relationship, query); - } - - this.queries = updatedQueryMap; - } - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> relationships = new HashSet<>(queries.keySet()); - relationships.add(REL_FAILURE); - relationships.add(REL_ORIGINAL); - return relationships; - } - - @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if ( flowFile == null ) { - return; - } - - final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue()); - - final byte[] buffer = new byte[(int) flowFile.getSize()]; - session.read(flowFile, new InputStreamCallback() { - @Override - public void process(final InputStream in) throws IOException { - StreamUtils.fillBuffer(in, buffer); - } - }); - - @SuppressWarnings("resource") - final HapiContext hapiContext = new DefaultHapiContext(); - hapiContext.setValidationContext(ValidationContextFactory.noValidation()); - - final PipeParser parser = hapiContext.getPipeParser(); - final String hl7Text = new String(buffer, charset); - final HL7Message message; - try { - final Message hapiMessage = parser.parse(hl7Text); - message = new HapiMessage(hapiMessage); - } catch (final Exception e) { - getLogger().error("Failed to parse {} as HL7 due to {}; routing to failure", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final Set<String> matchingRels = new HashSet<>(); - final Map<Relationship, HL7Query> queryMap = queries; - for ( final Map.Entry<Relationship, HL7Query> entry : queryMap.entrySet() ) { - final Relationship relationship = entry.getKey(); - final HL7Query query = entry.getValue(); - - final QueryResult result = query.evaluate(message); - if ( result.isMatch() ) { - FlowFile clone = session.clone(flowFile); - clone = session.putAttribute(clone, "RouteHL7.Route", relationship.getName()); - session.transfer(clone, relationship); - session.getProvenanceReporter().route(clone, relationship); - matchingRels.add(relationship.getName()); - } - } - - session.transfer(flowFile, REL_ORIGINAL); - getLogger().info("Routed a copy of {} to {} relationships: {}", new Object[] {flowFile, matchingRels.size(), matchingRels}); - } - - private static class HL7QueryValidator implements Validator { - - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - String error = null; - - try { - final HL7Query hl7Query = HL7Query.compile(input); - final List<Class<?>> returnTypes = hl7Query.getReturnTypes(); - if ( returnTypes.size() != 1 ) { - error = "RouteHL7 requires that the HL7 Query return exactly 1 element of type MESSAGE"; - } else if ( !HL7Message.class.isAssignableFrom(returnTypes.get(0)) ) { - error = "RouteHL7 requires that the HL7 Query return exactly 1 element of type MESSAGE"; - } - } catch (final HL7QueryParsingException e) { - error = e.toString(); - } - - if ( error == null ) { - return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); - } else { - return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(error).build(); - } - } - - } + + public static final PropertyDescriptor CHARACTER_SET = new PropertyDescriptor.Builder() + .name("Character Encoding") + .description("The Character Encoding that is used to encode the HL7 data") + .required(true) + .expressionLanguageSupported(true) + .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) + .defaultValue("UTF-8") + .build(); + + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("Any FlowFile that cannot be parsed as HL7 will be routed to this relationship") + .build(); + static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile that comes into this processor will be routed to this relationship, unless it is routed to 'failure'") + .build(); + + private volatile Map<Relationship, HL7Query> queries = new HashMap<>(); + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("Specifies a query that will cause any HL7 message matching the query to be routed to the '" + propertyDescriptorName + "' relationship") + .required(false) + .dynamic(true) + .addValidator(new HL7QueryValidator()) + .build(); + } + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(CHARACTER_SET); + return properties; + } + + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + if (!descriptor.isDynamic()) { + return; + } + + final Map<Relationship, HL7Query> updatedQueryMap = new HashMap<>(queries); + final Relationship relationship = new Relationship.Builder().name(descriptor.getName()).build(); + + if (newValue == null) { + updatedQueryMap.remove(relationship); + } else { + final HL7Query query = HL7Query.compile(newValue); + updatedQueryMap.put(relationship, query); + } + + this.queries = updatedQueryMap; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(queries.keySet()); + relationships.add(REL_FAILURE); + relationships.add(REL_ORIGINAL); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final Charset charset = Charset.forName(context.getProperty(CHARACTER_SET).evaluateAttributeExpressions(flowFile).getValue()); + + final byte[] buffer = new byte[(int) flowFile.getSize()]; + session.read(flowFile, new InputStreamCallback() { + @Override + public void process(final InputStream in) throws IOException { + StreamUtils.fillBuffer(in, buffer); + } + }); + + @SuppressWarnings("resource") + final HapiContext hapiContext = new DefaultHapiContext(); + hapiContext.setValidationContext(ValidationContextFactory.noValidation()); + + final PipeParser parser = hapiContext.getPipeParser(); + final String hl7Text = new String(buffer, charset); + final HL7Message message; + try { + final Message hapiMessage = parser.parse(hl7Text); + message = new HapiMessage(hapiMessage); + } catch (final Exception e) { + getLogger().error("Failed to parse {} as HL7 due to {}; routing to failure", new Object[]{flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final Set<String> matchingRels = new HashSet<>(); + final Map<Relationship, HL7Query> queryMap = queries; + for (final Map.Entry<Relationship, HL7Query> entry : queryMap.entrySet()) { + final Relationship relationship = entry.getKey(); + final HL7Query query = entry.getValue(); + + final QueryResult result = query.evaluate(message); + if (result.isMatch()) { + FlowFile clone = session.clone(flowFile); + clone = session.putAttribute(clone, "RouteHL7.Route", relationship.getName()); + session.transfer(clone, relationship); + session.getProvenanceReporter().route(clone, relationship); + matchingRels.add(relationship.getName()); + } + } + + session.transfer(flowFile, REL_ORIGINAL); + getLogger().info("Routed a copy of {} to {} relationships: {}", new Object[]{flowFile, matchingRels.size(), matchingRels}); + } + + private static class HL7QueryValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + String error = null; + + try { + final HL7Query hl7Query = HL7Query.compile(input); + final List<Class<?>> returnTypes = hl7Query.getReturnTypes(); + if (returnTypes.size() != 1) { + error = "RouteHL7 requires that the HL7 Query return exactly 1 element of type MESSAGE"; + } else if (!HL7Message.class.isAssignableFrom(returnTypes.get(0))) { + error = "RouteHL7 requires that the HL7 Query return exactly 1 element of type MESSAGE"; + } + } catch (final HL7QueryParsingException e) { + error = e.toString(); + } + + if (error == null) { + return new ValidationResult.Builder().subject(subject).input(input).valid(true).build(); + } else { + return new ValidationResult.Builder().subject(subject).input(input).valid(false).explanation(error).build(); + } + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/5a559e45/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java ---------------------------------------------------------------------- diff --git a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java index c156810..d173c27 100644 --- a/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java +++ b/nifi/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/test/java/org/apache/nifi/processors/hl7/TestExtractHL7Attributes.java @@ -30,19 +30,19 @@ import org.junit.Test; public class TestExtractHL7Attributes { - @Test - public void testExtract() throws IOException { - System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG"); - final TestRunner runner = TestRunners.newTestRunner(ExtractHL7Attributes.class); - runner.enqueue(Paths.get("src/test/resources/hypoglycemia.hl7")); - - runner.run(); - runner.assertAllFlowFilesTransferred(ExtractHL7Attributes.REL_SUCCESS, 1); - final MockFlowFile out = runner.getFlowFilesForRelationship(ExtractHL7Attributes.REL_SUCCESS).get(0); - final SortedMap<String, String> sortedAttrs = new TreeMap<>(out.getAttributes()); - for (final Map.Entry<String, String> entry : sortedAttrs.entrySet()) { - System.out.println(entry.getKey() + " : " + entry.getValue()); - } - } - + @Test + public void testExtract() throws IOException { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "DEBUG"); + final TestRunner runner = TestRunners.newTestRunner(ExtractHL7Attributes.class); + runner.enqueue(Paths.get("src/test/resources/hypoglycemia.hl7")); + + runner.run(); + runner.assertAllFlowFilesTransferred(ExtractHL7Attributes.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(ExtractHL7Attributes.REL_SUCCESS).get(0); + final SortedMap<String, String> sortedAttrs = new TreeMap<>(out.getAttributes()); + for (final Map.Entry<String, String> entry : sortedAttrs.entrySet()) { + System.out.println(entry.getKey() + " : " + entry.getValue()); + } + } + }
