http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java new file mode 100644 index 0000000..4eb7001 --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardProvenanceEventRecord.java @@ -0,0 +1,751 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.Relationship; + +/** + * Holder for provenance relevant information + * <p/> + */ +public final class StandardProvenanceEventRecord implements ProvenanceEventRecord { + + private final long eventTime; + private final long entryDate; + private final ProvenanceEventType eventType; + private final long lineageStartDate; + private final Set<String> lineageIdentifiers; + private final String componentId; + private final String componentType; + private final String transitUri; + private final String sourceSystemFlowFileIdentifier; + private final String uuid; + private final List<String> parentUuids; + private final List<String> childrenUuids; + private final String alternateIdentifierUri; + private final String details; + private final String relationship; + private final long storageByteOffset; + private final String storageFilename; + private final long eventDuration; + + private final String contentClaimSection; + private final String contentClaimContainer; + private final String contentClaimIdentifier; + private final Long contentClaimOffset; + private final long contentSize; + + private final String previousClaimSection; + private final String previousClaimContainer; + private final String previousClaimIdentifier; + private final Long previousClaimOffset; + private final Long previousSize; + + private final String sourceQueueIdentifier; + + private final Map<String, String> previousAttributes; + private final Map<String, String> updatedAttributes; + + private volatile long eventId; + + private StandardProvenanceEventRecord(final Builder builder) { + this.eventTime = builder.eventTime; + this.entryDate = builder.entryDate; + this.eventType = builder.eventType; + this.componentId = builder.componentId; + this.componentType = builder.componentType; + this.transitUri = builder.transitUri; + this.sourceSystemFlowFileIdentifier = builder.sourceSystemFlowFileIdentifier; + this.uuid = builder.uuid; + this.parentUuids = builder.parentUuids; + this.childrenUuids = builder.childrenUuids; + this.alternateIdentifierUri = builder.alternateIdentifierUri; + this.details = builder.details; + this.relationship = builder.relationship; + this.storageByteOffset = builder.storageByteOffset; + this.storageFilename = builder.storageFilename; + this.eventDuration = builder.eventDuration; + this.lineageStartDate = builder.lineageStartDate; + this.lineageIdentifiers = Collections.unmodifiableSet(builder.lineageIdentifiers); + + previousClaimSection = builder.previousClaimSection; + previousClaimContainer = builder.previousClaimContainer; + previousClaimIdentifier = builder.previousClaimIdentifier; + previousClaimOffset = builder.previousClaimOffset; + previousSize = builder.previousSize; + + contentClaimSection = builder.contentClaimSection; + contentClaimContainer = builder.contentClaimContainer; + contentClaimIdentifier = builder.contentClaimIdentifier; + contentClaimOffset = builder.contentClaimOffset; + contentSize = builder.contentSize; + + previousAttributes = builder.previousAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.previousAttributes); + updatedAttributes = builder.updatedAttributes == null ? Collections.<String, String>emptyMap() : Collections.unmodifiableMap(builder.updatedAttributes); + + sourceQueueIdentifier = builder.sourceQueueIdentifier; + + } + + public String getStorageFilename() { + return storageFilename; + } + + public long getStorageByteOffset() { + return storageByteOffset; + } + + void setEventId(final long eventId) { + this.eventId = eventId; + } + + @Override + public long getEventId() { + return eventId; + } + + @Override + public long getEventTime() { + return eventTime; + } + + @Override + public Set<String> getLineageIdentifiers() { + return lineageIdentifiers; + } + + @Override + public long getLineageStartDate() { + return lineageStartDate; + } + + @Override + public long getFileSize() { + return contentSize; + } + + @Override + public Long getPreviousFileSize() { + return previousSize; + } + + @Override + public ProvenanceEventType getEventType() { + return eventType; + } + + @Override + public Map<String, String> getAttributes() { + final Map<String, String> allAttrs = new HashMap<>(previousAttributes.size() + updatedAttributes.size()); + allAttrs.putAll(previousAttributes); + for (final Map.Entry<String, String> entry : updatedAttributes.entrySet()) { + if (entry.getValue() != null) { + allAttrs.put(entry.getKey(), entry.getValue()); + } + } + return allAttrs; + } + + @Override + public String getComponentId() { + return componentId; + } + + @Override + public String getComponentType() { + return componentType; + } + + @Override + public String getTransitUri() { + return transitUri; + } + + @Override + public String getSourceSystemFlowFileIdentifier() { + return sourceSystemFlowFileIdentifier; + } + + @Override + public String getFlowFileUuid() { + return uuid; + } + + @Override + public List<String> getParentUuids() { + return parentUuids == null ? Collections.<String>emptyList() : parentUuids; + } + + @Override + public List<String> getChildUuids() { + return childrenUuids == null ? Collections.<String>emptyList() : childrenUuids; + } + + @Override + public String getAlternateIdentifierUri() { + return alternateIdentifierUri; + } + + @Override + public long getEventDuration() { + return eventDuration; + } + + @Override + public String getDetails() { + return details; + } + + @Override + public String getRelationship() { + return relationship; + } + + @Override + public long getFlowFileEntryDate() { + return entryDate; + } + + @Override + public String getContentClaimSection() { + return contentClaimSection; + } + + @Override + public String getContentClaimContainer() { + return contentClaimContainer; + } + + @Override + public String getContentClaimIdentifier() { + return contentClaimIdentifier; + } + + @Override + public Long getContentClaimOffset() { + return contentClaimOffset; + } + + @Override + public String getSourceQueueIdentifier() { + return sourceQueueIdentifier; + } + + @Override + public Map<String, String> getPreviousAttributes() { + return previousAttributes; + } + + @Override + public String getPreviousContentClaimContainer() { + return previousClaimContainer; + } + + @Override + public String getPreviousContentClaimIdentifier() { + return previousClaimIdentifier; + } + + @Override + public Long getPreviousContentClaimOffset() { + return previousClaimOffset; + } + + @Override + public String getPreviousContentClaimSection() { + return previousClaimSection; + } + + @Override + public Map<String, String> getUpdatedAttributes() { + return updatedAttributes; + } + + @Override + public int hashCode() { + final int eventTypeCode; + if (eventType == ProvenanceEventType.CLONE || eventType == ProvenanceEventType.JOIN || eventType == ProvenanceEventType.FORK) { + eventTypeCode = 1472; + } else if (eventType == ProvenanceEventType.REPLAY) { + eventTypeCode = 21479 + (int) (0x7FFFFFFF & eventTime); // use lower bits of event time. + } else { + eventTypeCode = 4812 + eventType.hashCode() + 4 * uuid.hashCode(); + } + + return -37423 + 3 * componentId.hashCode() + (transitUri == null ? 0 : 41 * transitUri.hashCode()) + + (relationship == null ? 0 : 47 * relationship.hashCode()) + 44 * eventTypeCode; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + if (!(obj instanceof StandardProvenanceEventRecord)) { + return false; + } + + final StandardProvenanceEventRecord other = (StandardProvenanceEventRecord) obj; + // If event ID's are populated and not equal, return false. If they have not yet been populated, do not + // use them in the comparison. + if (eventId > 0L && other.getEventId() > 0L && eventId != other.getEventId()) { + return false; + } + if (eventType != other.eventType) { + return false; + } + + if (!componentId.equals(other.componentId)) { + return false; + } + + if (different(parentUuids, other.parentUuids)) { + return false; + } + + if (different(childrenUuids, other.childrenUuids)) { + return false; + } + + // SPAWN had issues indicating which should be the event's FlowFileUUID in the case that there is 1 parent and 1 child. + if (!uuid.equals(other.uuid)) { + return false; + } + + if (different(transitUri, other.transitUri)) { + return false; + } + + if (different(relationship, other.relationship)) { + return false; + } + + return !(eventType == ProvenanceEventType.REPLAY && eventTime != other.getEventTime()); + } + + private boolean different(final Object a, final Object b) { + if (a == null && b == null) { + return false; + } + if (a == null || b == null) { + return true; + } + + return !a.equals(b); + } + + private boolean different(final List<String> a, final List<String> b) { + if (a == null && b == null) { + return false; + } + + if (a == null && b != null) { + return true; + } + + if (a != null && b == null) { + return true; + } + + if (a.size() != b.size()) { + return true; + } + + final List<String> sortedA = new ArrayList<>(a); + final List<String> sortedB = new ArrayList<>(b); + + Collections.sort(sortedA); + Collections.sort(sortedB); + + for (int i = 0; i < sortedA.size(); i++) { + if (!sortedA.get(i).equals(sortedB.get(i))) { + return true; + } + } + + return false; + } + + @Override + public String toString() { + return "ProvenanceEventRecord [" + + "eventId=" + eventId + + ", eventType=" + eventType + + ", eventTime=" + new Date(eventTime) + + ", uuid=" + uuid + + ", fileSize=" + contentSize + + ", componentId=" + componentId + + ", transitUri=" + transitUri + + ", sourceSystemFlowFileIdentifier=" + sourceSystemFlowFileIdentifier + + ", parentUuids=" + parentUuids + + ", alternateIdentifierUri=" + alternateIdentifierUri + "]"; + } + + public static class Builder implements ProvenanceEventBuilder { + + private long eventTime = System.currentTimeMillis(); + private long entryDate; + private long lineageStartDate; + private Set<String> lineageIdentifiers = new HashSet<>(); + private ProvenanceEventType eventType = null; + private String componentId = null; + private String componentType = null; + private String sourceSystemFlowFileIdentifier = null; + private String transitUri = null; + private String uuid = null; + private List<String> parentUuids = null; + private List<String> childrenUuids = null; + private String contentType = null; + private String alternateIdentifierUri = null; + private String details = null; + private String relationship = null; + private long storageByteOffset = -1L; + private long eventDuration = -1L; + private String storageFilename; + + private String contentClaimSection; + private String contentClaimContainer; + private String contentClaimIdentifier; + private Long contentClaimOffset; + private Long contentSize; + + private String previousClaimSection; + private String previousClaimContainer; + private String previousClaimIdentifier; + private Long previousClaimOffset; + private Long previousSize; + + private String sourceQueueIdentifier; + + private Map<String, String> previousAttributes; + private Map<String, String> updatedAttributes; + + @Override + public Builder fromEvent(final ProvenanceEventRecord event) { + eventTime = event.getEventTime(); + entryDate = event.getFlowFileEntryDate(); + lineageStartDate = event.getLineageStartDate(); + lineageIdentifiers = event.getLineageIdentifiers(); + eventType = event.getEventType(); + componentId = event.getComponentId(); + componentType = event.getComponentType(); + transitUri = event.getTransitUri(); + sourceSystemFlowFileIdentifier = event.getSourceSystemFlowFileIdentifier(); + uuid = event.getFlowFileUuid(); + parentUuids = event.getParentUuids(); + childrenUuids = event.getChildUuids(); + alternateIdentifierUri = event.getAlternateIdentifierUri(); + eventDuration = event.getEventDuration(); + previousAttributes = event.getPreviousAttributes(); + updatedAttributes = event.getUpdatedAttributes(); + details = event.getDetails(); + relationship = event.getRelationship(); + + contentClaimSection = event.getContentClaimSection(); + contentClaimContainer = event.getContentClaimContainer(); + contentClaimIdentifier = event.getContentClaimIdentifier(); + contentClaimOffset = event.getContentClaimOffset(); + contentSize = event.getFileSize(); + + previousClaimSection = event.getPreviousContentClaimSection(); + previousClaimContainer = event.getPreviousContentClaimContainer(); + previousClaimIdentifier = event.getPreviousContentClaimIdentifier(); + previousClaimOffset = event.getPreviousContentClaimOffset(); + previousSize = event.getPreviousFileSize(); + + sourceQueueIdentifier = event.getSourceQueueIdentifier(); + + if (event instanceof StandardProvenanceEventRecord) { + final StandardProvenanceEventRecord standardProvEvent = (StandardProvenanceEventRecord) event; + storageByteOffset = standardProvEvent.storageByteOffset; + storageFilename = standardProvEvent.storageFilename; + } + + return this; + } + + @Override + public Builder setFlowFileEntryDate(final long entryDate) { + this.entryDate = entryDate; + return this; + } + + @Override + public Builder setLineageIdentifiers(final Set<String> lineageIdentifiers) { + this.lineageIdentifiers = lineageIdentifiers; + return this; + } + + @Override + public Builder setAttributes(final Map<String, String> previousAttributes, final Map<String, String> updatedAttributes) { + this.previousAttributes = previousAttributes; + this.updatedAttributes = updatedAttributes; + return this; + } + + @Override + public Builder setFlowFileUUID(final String uuid) { + this.uuid = uuid; + return this; + } + + public Builder setStorageLocation(final String filename, final long offset) { + this.storageFilename = filename; + this.storageByteOffset = offset; + return this; + } + + @Override + public Builder setEventTime(long eventTime) { + this.eventTime = eventTime; + return this; + } + + @Override + public Builder setEventDuration(final long millis) { + this.eventDuration = millis; + return this; + } + + @Override + public Builder setLineageStartDate(final long startDate) { + this.lineageStartDate = startDate; + return this; + } + + public Builder addLineageIdentifier(final String lineageIdentifier) { + this.lineageIdentifiers.add(lineageIdentifier); + return this; + } + + @Override + public Builder setEventType(ProvenanceEventType eventType) { + this.eventType = eventType; + return this; + } + + @Override + public Builder setComponentId(String componentId) { + this.componentId = componentId; + return this; + } + + @Override + public Builder setComponentType(String componentType) { + this.componentType = componentType; + return this; + } + + @Override + public Builder setSourceSystemFlowFileIdentifier(String sourceSystemFlowFileIdentifier) { + this.sourceSystemFlowFileIdentifier = sourceSystemFlowFileIdentifier; + return this; + } + + @Override + public Builder setTransitUri(String transitUri) { + this.transitUri = transitUri; + return this; + } + + @Override + public Builder addParentFlowFile(final FlowFile parentFlowFile) { + if (this.parentUuids == null) { + this.parentUuids = new ArrayList<>(); + } + this.parentUuids.add(parentFlowFile.getAttribute(CoreAttributes.UUID.key())); + return this; + } + + public Builder addParentUuid(final String uuid) { + if (this.parentUuids == null) { + this.parentUuids = new ArrayList<>(); + } + this.parentUuids.add(uuid); + return this; + } + + @Override + public Builder removeParentFlowFile(final FlowFile parentFlowFile) { + if (this.parentUuids == null) { + return this; + } + + parentUuids.remove(parentFlowFile.getAttribute(CoreAttributes.UUID.key())); + return this; + } + + @Override + public Builder addChildFlowFile(final FlowFile childFlowFile) { + if (this.childrenUuids == null) { + this.childrenUuids = new ArrayList<>(); + } + this.childrenUuids.add(childFlowFile.getAttribute(CoreAttributes.UUID.key())); + return this; + } + + public Builder addChildUuid(final String uuid) { + if (this.childrenUuids == null) { + this.childrenUuids = new ArrayList<>(); + } + this.childrenUuids.add(uuid); + return this; + } + + @Override + public Builder removeChildFlowFile(final FlowFile childFlowFile) { + if (this.childrenUuids == null) { + return this; + } + + childrenUuids.remove(childFlowFile.getAttribute(CoreAttributes.UUID.key())); + return this; + } + + public Builder setContentType(String contentType) { + this.contentType = contentType; + return this; + } + + @Override + public Builder setAlternateIdentifierUri(String alternateIdentifierUri) { + this.alternateIdentifierUri = alternateIdentifierUri; + return this; + } + + @Override + public Builder setDetails(String details) { + this.details = details; + return this; + } + + @Override + public Builder setRelationship(Relationship relationship) { + this.relationship = relationship.getName(); + return this; + } + + public Builder setRelationship(final String relationship) { + this.relationship = relationship; + return this; + } + + @Override + public ProvenanceEventBuilder fromFlowFile(final FlowFile flowFile) { + setFlowFileEntryDate(flowFile.getEntryDate()); + setLineageIdentifiers(flowFile.getLineageIdentifiers()); + setLineageStartDate(flowFile.getLineageStartDate()); + setAttributes(Collections.<String, String>emptyMap(), flowFile.getAttributes()); + uuid = flowFile.getAttribute(CoreAttributes.UUID.key()); + this.contentSize = flowFile.getSize(); + return this; + } + + @Override + public Builder setPreviousContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) { + previousClaimSection = section; + previousClaimContainer = container; + previousClaimIdentifier = identifier; + previousClaimOffset = offset; + previousSize = size; + return this; + } + + @Override + public Builder setCurrentContentClaim(final String container, final String section, final String identifier, final Long offset, final long size) { + contentClaimSection = section; + contentClaimContainer = container; + contentClaimIdentifier = identifier; + contentClaimOffset = offset; + contentSize = size; + return this; + } + + @Override + public Builder setSourceQueueIdentifier(final String identifier) { + sourceQueueIdentifier = identifier; + return this; + } + + private void assertSet(final Object value, final String name) { + if (value == null) { + throw new IllegalStateException("Cannot create Provenance Event Record because " + name + " is not set"); + } + } + + public ProvenanceEventType getEventType() { + return eventType; + } + + public List<String> getChildUuids() { + return Collections.unmodifiableList(childrenUuids); + } + + public List<String> getParentUuids() { + return Collections.unmodifiableList(parentUuids); + } + + @Override + public StandardProvenanceEventRecord build() { + assertSet(eventType, "Event Type"); + assertSet(componentId, "Component ID"); + assertSet(componentType, "Component Type"); + assertSet(uuid, "FlowFile UUID"); + assertSet(contentSize, "FlowFile Size"); + + switch (eventType) { + case ADDINFO: + if (alternateIdentifierUri == null) { + throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no alternate identifiers have been set"); + } + break; + case RECEIVE: + case SEND: + assertSet(transitUri, "Transit URI"); + break; + case ROUTE: + assertSet(relationship, "Relationship"); + break; + case CLONE: + case FORK: + case JOIN: + if ((parentUuids == null || parentUuids.isEmpty()) && (childrenUuids == null || childrenUuids.isEmpty())) { + throw new IllegalStateException("Cannot create Provenance Event Record of type " + eventType + " because no Parent UUIDs or Children UUIDs have been set"); + } + break; + default: + break; + } + + return new StandardProvenanceEventRecord(this); + } + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java new file mode 100644 index 0000000..9a9a27d --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/StandardQueryResult.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import org.apache.nifi.provenance.search.Query; +import org.apache.nifi.provenance.search.QueryResult; + +public class StandardQueryResult implements QueryResult { + + public static final int TTL = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.MINUTES); + private final Query query; + private final long creationNanos; + + private final int numSteps; + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock readLock = rwLock.readLock(); + + private final Lock writeLock = rwLock.writeLock(); + // guarded by writeLock + private final List<ProvenanceEventRecord> matchingRecords = new ArrayList<>(); + private long totalHitCount; + private int numCompletedSteps = 0; + private Date expirationDate; + private String error; + private long queryTime; + + private volatile boolean canceled = false; + + public StandardQueryResult(final Query query, final int numSteps) { + this.query = query; + this.numSteps = numSteps; + this.creationNanos = System.nanoTime(); + + updateExpiration(); + } + + @Override + public List<ProvenanceEventRecord> getMatchingEvents() { + readLock.lock(); + try { + if (matchingRecords.size() <= query.getMaxResults()) { + return new ArrayList<>(matchingRecords); + } + + final List<ProvenanceEventRecord> copy = new ArrayList<>(query.getMaxResults()); + for (int i = 0; i < query.getMaxResults(); i++) { + copy.add(matchingRecords.get(i)); + } + + return copy; + } finally { + readLock.unlock(); + } + } + + @Override + public long getTotalHitCount() { + readLock.lock(); + try { + return totalHitCount; + } finally { + readLock.unlock(); + } + } + + @Override + public long getQueryTime() { + return queryTime; + } + + @Override + public Date getExpiration() { + return expirationDate; + } + + @Override + public String getError() { + return error; + } + + @Override + public int getPercentComplete() { + readLock.lock(); + try { + return (numSteps < 1) ? 100 : (int) (((float) numCompletedSteps / (float) numSteps) * 100.0F); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean isFinished() { + readLock.lock(); + try { + return numCompletedSteps >= numSteps || canceled; + } finally { + readLock.unlock(); + } + } + + void cancel() { + this.canceled = true; + } + + public void setError(final String error) { + writeLock.lock(); + try { + this.error = error; + numCompletedSteps++; + + updateExpiration(); + if (numCompletedSteps >= numSteps) { + final long searchNanos = System.nanoTime() - creationNanos; + queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS); + } + } finally { + writeLock.unlock(); + } + } + + public void update(final Collection<ProvenanceEventRecord> matchingRecords, final long totalHits) { + writeLock.lock(); + try { + this.matchingRecords.addAll(matchingRecords); + this.totalHitCount += totalHits; + + numCompletedSteps++; + updateExpiration(); + + if (numCompletedSteps >= numSteps) { + final long searchNanos = System.nanoTime() - creationNanos; + queryTime = TimeUnit.MILLISECONDS.convert(searchNanos, TimeUnit.NANOSECONDS); + } + } finally { + writeLock.unlock(); + } + } + + /** + * Must be called with write lock! + */ + private void updateExpiration() { + expirationDate = new Date(System.currentTimeMillis() + TTL); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java new file mode 100644 index 0000000..0aaf5ef --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EdgeNode.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import static java.util.Objects.requireNonNull; + +public class EdgeNode implements LineageEdge { + + private final String uuid; + private final LineageNode source; + private final LineageNode destination; + + public EdgeNode(final String uuid, final LineageNode source, final LineageNode destination) { + this.uuid = uuid; + this.source = requireNonNull(source); + this.destination = requireNonNull(destination); + } + + @Override + public String getUuid() { + return uuid; + } + + @Override + public LineageNode getSource() { + return source; + } + + @Override + public LineageNode getDestination() { + return destination; + } + + @Override + public int hashCode() { + return 43298293 + source.hashCode() + destination.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (this == obj) { + return true; + } + if (obj == null) { + return false; + } + + if (!(obj instanceof EdgeNode)) { + return false; + } + + final EdgeNode other = (EdgeNode) obj; + return (source.equals(other.source) && destination.equals(other.destination)); + } + + @Override + public String toString() { + return "Edge[Source=" + source + ", Destination=" + destination + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java new file mode 100644 index 0000000..1f8d1dc --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/EventNode.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import java.util.List; + +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.provenance.ProvenanceEventRecord; +import org.apache.nifi.provenance.ProvenanceEventType; + +public class EventNode implements ProvenanceEventLineageNode { + + private final ProvenanceEventRecord record; + private String clusterNodeIdentifier = null; + + public EventNode(final ProvenanceEventRecord event) { + this.record = event; + } + + @Override + public String getIdentifier() { + return String.valueOf(getEventIdentifier()); + } + + @Deprecated + @Override + public String getClusterNodeIdentifier() { + return clusterNodeIdentifier; + } + + @Deprecated + public void setClusterNodeIdentifier(final String nodeIdentifier) { + this.clusterNodeIdentifier = nodeIdentifier; + } + + @Override + public LineageNodeType getNodeType() { + return LineageNodeType.PROVENANCE_EVENT_NODE; + } + + @Override + public ProvenanceEventType getEventType() { + return record.getEventType(); + } + + @Override + public long getTimestamp() { + return record.getEventTime(); + } + + @Override + public long getEventIdentifier() { + return record.getEventId(); + } + + @Override + public String getFlowFileUuid() { + return record.getAttributes().get(CoreAttributes.UUID.key()); + } + + @Override + public List<String> getParentUuids() { + return record.getParentUuids(); + } + + @Override + public List<String> getChildUuids() { + return record.getChildUuids(); + } + + @Override + public int hashCode() { + return 2938472 + record.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (this == obj) { + return true; + } + + if (!(obj instanceof EventNode)) { + return false; + } + + final EventNode other = (EventNode) obj; + return record.equals(other.record); + } + + @Override + public String toString() { + return "Event[ID=" + record.getEventId() + ", Type=" + record.getEventType() + ", UUID=" + record.getFlowFileUuid() + ", Component=" + record.getComponentId() + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java new file mode 100644 index 0000000..c36c38d --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileLineage.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static java.util.Objects.requireNonNull; + +public class FlowFileLineage implements Lineage { + + private final List<LineageNode> nodes; + private final List<LineageEdge> edges; + + public FlowFileLineage(final Collection<LineageNode> nodes, final Collection<LineageEdge> edges) { + this.nodes = new ArrayList<>(requireNonNull(nodes)); + this.edges = new ArrayList<>(requireNonNull(edges)); + } + + @Override + public List<LineageNode> getNodes() { + return nodes; + } + + @Override + public List<LineageEdge> getEdges() { + return edges; + } + + @Override + public int hashCode() { + int sum = 923; + for (final LineageNode node : nodes) { + sum += node.hashCode(); + } + + for (final LineageEdge edge : edges) { + sum += edge.hashCode(); + } + + return sum; + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + + if (obj == this) { + return true; + } + + if (!(obj instanceof FlowFileLineage)) { + return false; + } + + final FlowFileLineage other = (FlowFileLineage) obj; + return nodes.equals(other.nodes) && edges.equals(other.edges); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java new file mode 100644 index 0000000..94e7661 --- /dev/null +++ b/nifi-commons/nifi-data-provenance-utils/src/main/java/org/apache/nifi/provenance/lineage/FlowFileNode.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.provenance.lineage; + +import static java.util.Objects.requireNonNull; + +public class FlowFileNode implements LineageNode { + + private final String flowFileUuid; + private final long creationTime; + private String clusterNodeIdentifier; + + public FlowFileNode(final String flowFileUuid, final long flowFileCreationTime) { + this.flowFileUuid = requireNonNull(flowFileUuid); + this.creationTime = flowFileCreationTime; + } + + @Override + public String getIdentifier() { + return flowFileUuid; + } + + @Override + public long getTimestamp() { + return creationTime; + } + + @Deprecated + @Override + public String getClusterNodeIdentifier() { + return clusterNodeIdentifier; + } + + @Override + public LineageNodeType getNodeType() { + return LineageNodeType.FLOWFILE_NODE; + } + + @Override + public String getFlowFileUuid() { + return flowFileUuid; + } + + @Override + public int hashCode() { + return 23498723 + flowFileUuid.hashCode(); + } + + @Override + public boolean equals(final Object obj) { + if (obj == null) { + return false; + } + if (obj == this) { + return true; + } + + if (!(obj instanceof FlowFileNode)) { + return false; + } + + final FlowFileNode other = (FlowFileNode) obj; + return flowFileUuid.equals(other.flowFileUuid); + } + + @Override + public String toString() { + return "FlowFile[UUID=" + flowFileUuid + "]"; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/pom.xml b/nifi-commons/nifi-expression-language/pom.xml new file mode 100644 index 0000000..3f0b8be --- /dev/null +++ b/nifi-commons/nifi-expression-language/pom.xml @@ -0,0 +1,56 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-commons</artifactId> + <version>0.3.0-SNAPSHOT</version> + </parent> + <artifactId>nifi-expression-language</artifactId> + <packaging>jar</packaging> + <build> + <plugins> + <plugin> + <groupId>org.antlr</groupId> + <artifactId>antlr3-maven-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>antlr</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-checkstyle-plugin</artifactId> + <configuration> + <excludes>**/antlr/AttributeExpressionParser.java,**/antlr/AttributeExpressionLexer.java</excludes> + </configuration> + </plugin> + </plugins> + </build> + <dependencies> + <dependency> + <groupId>org.antlr</groupId> + <artifactId>antlr-runtime</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-api</artifactId> + </dependency> + </dependencies> +</project> http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g new file mode 100644 index 0000000..80581f5 --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionLexer.g @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +lexer grammar AttributeExpressionLexer; + +@header { + package org.apache.nifi.attribute.expression.language.antlr; + import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; +} + +@rulecatch { + catch(final Exception e) { + throw new AttributeExpressionLanguageParsingException(e); + } +} + +@members { + public void displayRecognitionError(String[] tokenNames, RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new AttributeExpressionLanguageParsingException(sb.toString()); + } + + public void recover(RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new AttributeExpressionLanguageParsingException(sb.toString()); + } +} + + +// PUNCTUATION & SPECIAL CHARACTERS +WHITESPACE : (' '|'\t'|'\n'|'\r')+ { $channel = HIDDEN; }; +COMMENT : '#' ( ~('\n') )* '\n' { $channel = HIDDEN; }; + +DOLLAR : '$'; +LPAREN : '('; +RPAREN : ')'; +LBRACE : '{'; +RBRACE : '}'; +COLON : ':'; +COMMA : ','; +DOT : '.'; +SEMICOLON : ';'; +NUMBER : ('0'..'9')+; + +TRUE : 'true'; +FALSE : 'false'; + +// +// FUNCTION NAMES +// + +// ATTRIBUTE KEY SELECTION FUNCTIONS +ANY_ATTRIBUTE : 'anyAttribute'; +ANY_MATCHING_ATTRIBUTE : 'anyMatchingAttribute'; +ALL_ATTRIBUTES : 'allAttributes'; +ALL_MATCHING_ATTRIBUTES : 'allMatchingAttributes'; +ANY_DELINEATED_VALUE : 'anyDelineatedValue'; +ALL_DELINEATED_VALUES : 'allDelineatedValues'; + +// NO-SUBJECT FUNCTIONS +NEXT_INT : 'nextInt'; +IP : 'ip'; +UUID : 'UUID'; +HOSTNAME : 'hostname'; // requires boolean arg: prefer FQDN +NOW : 'now'; + + +// 0 arg functions +TO_UPPER : 'toUpper'; +TO_LOWER : 'toLower'; +TO_STRING : 'toString'; +LENGTH : 'length'; +TRIM : 'trim'; +IS_NULL : 'isNull'; +IS_EMPTY : 'isEmpty'; +NOT_NULL : 'notNull'; +TO_NUMBER : 'toNumber'; +URL_ENCODE : 'urlEncode'; +URL_DECODE : 'urlDecode'; +NOT : 'not'; +COUNT : 'count'; + +// 1 arg functions +SUBSTRING_AFTER : 'substringAfter'; +SUBSTRING_BEFORE : 'substringBefore'; +SUBSTRING_AFTER_LAST : 'substringAfterLast'; +SUBSTRING_BEFORE_LAST : 'substringBeforeLast'; +STARTS_WITH : 'startsWith'; +ENDS_WITH : 'endsWith'; +CONTAINS : 'contains'; +PREPEND : 'prepend'; +APPEND : 'append'; +INDEX_OF : 'indexOf'; +LAST_INDEX_OF : 'lastIndexOf'; +REPLACE_NULL : 'replaceNull'; +REPLACE_EMPTY : 'replaceEmpty'; +FIND : 'find'; // regex +MATCHES : 'matches'; // regex +EQUALS : 'equals'; +EQUALS_IGNORE_CASE : 'equalsIgnoreCase'; +GREATER_THAN : 'gt'; +LESS_THAN : 'lt'; +GREATER_THAN_OR_EQUAL : 'ge'; +LESS_THAN_OR_EQUAL : 'le'; +FORMAT : 'format'; // takes string date format; uses SimpleDateFormat +TO_DATE : 'toDate'; // takes string date format; converts the subject to a Long based on the date format +MOD : 'mod'; +PLUS : 'plus'; +MINUS : 'minus'; +MULTIPLY : 'multiply'; +DIVIDE : 'divide'; +TO_RADIX : 'toRadix'; +OR : 'or'; +AND : 'and'; +JOIN : 'join'; +TO_LITERAL : 'literal'; + +// 2 arg functions +SUBSTRING : 'substring'; +REPLACE : 'replace'; +REPLACE_ALL : 'replaceAll'; + + +// STRINGS +STRING_LITERAL +@init{StringBuilder lBuf = new StringBuilder();} + : + ( + '"' + ( + escaped=ESC {lBuf.append(getText());} | + normal = ~( '"' | '\\' | '\n' | '\r' | '\t' ) { lBuf.appendCodePoint(normal);} + )* + '"' + ) + { + setText(lBuf.toString()); + } + | + ( + '\'' + ( + escaped=ESC {lBuf.append(getText());} | + normal = ~( '\'' | '\\' | '\n' | '\r' | '\t' ) { lBuf.appendCodePoint(normal);} + )* + '\'' + ) + { + setText(lBuf.toString()); + } + ; + + +fragment +ESC + : '\\' + ( + '"' { setText("\""); } + | '\'' { setText("\'"); } + | 'r' { setText("\r"); } + | 'n' { setText("\n"); } + | 't' { setText("\t"); } + | '\\' { setText("\\\\"); } + | nextChar = ~('"' | '\'' | 'r' | 'n' | 't' | '\\') + { + StringBuilder lBuf = new StringBuilder(); lBuf.append("\\\\").appendCodePoint(nextChar); setText(lBuf.toString()); + } + ) + ; + +ATTRIBUTE_NAME : ( + ~('$' | '{' | '}' | '(' | ')' | '[' | ']' | ',' | ':' | ';' | '/' | '*' | '\'' | ' ' | '\t' | '\r' | '\n' | '0'..'9') + ~('$' | '{' | '}' | '(' | ')' | '[' | ']' | ',' | ':' | ';' | '/' | '*' | '\'' | ' ' | '\t' | '\r' | '\n')* + ); http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g new file mode 100644 index 0000000..7c37530 --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/AttributeExpressionParser.g @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +parser grammar AttributeExpressionParser; + +options { + output=AST; + tokenVocab=AttributeExpressionLexer; +} + +tokens { + QUERY; + ATTRIBUTE_REFERENCE; + ATTR_NAME; + FUNCTION_CALL; + EXPRESSION; + MULTI_ATTRIBUTE_REFERENCE; + QUOTED_ATTR_NAME; +} + +@header { + package org.apache.nifi.attribute.expression.language.antlr; + import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageParsingException; +} + +@members { + public void displayRecognitionError(String[] tokenNames, RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new AttributeExpressionLanguageParsingException(sb.toString()); + } + + public void recover(final RecognitionException e) { + final StringBuilder sb = new StringBuilder(); + if ( e.token == null ) { + sb.append("Unrecognized token "); + } else { + sb.append("Unexpected token '").append(e.token.getText()).append("' "); + } + sb.append("at line ").append(e.line); + if ( e.approximateLineInfo ) { + sb.append(" (approximately)"); + } + sb.append(", column ").append(e.charPositionInLine); + sb.append(". Query: ").append(e.input.toString()); + + throw new AttributeExpressionLanguageParsingException(sb.toString()); + } +} + +// functions that return Strings +zeroArgString : (TO_UPPER | TO_LOWER | TRIM | TO_STRING | URL_ENCODE | URL_DECODE) LPAREN! RPAREN!; +oneArgString : ((SUBSTRING_BEFORE | SUBSTRING_BEFORE_LAST | SUBSTRING_AFTER | SUBSTRING_AFTER_LAST | REPLACE_NULL | REPLACE_EMPTY | + PREPEND | APPEND | FORMAT | STARTS_WITH | ENDS_WITH | CONTAINS | JOIN) LPAREN! anyArg RPAREN!) | + (TO_RADIX LPAREN! anyArg (COMMA! anyArg)? RPAREN!); +twoArgString : ((REPLACE | REPLACE_ALL) LPAREN! anyArg COMMA! anyArg RPAREN!) | + (SUBSTRING LPAREN! anyArg (COMMA! anyArg)? RPAREN!); + + +// functions that return Booleans +zeroArgBool : (IS_NULL | NOT_NULL | IS_EMPTY | NOT) LPAREN! RPAREN!; +oneArgBool : ((FIND | MATCHES | EQUALS_IGNORE_CASE) LPAREN! anyArg RPAREN!) | + (GREATER_THAN | LESS_THAN | GREATER_THAN_OR_EQUAL | LESS_THAN_OR_EQUAL) LPAREN! anyArg RPAREN! | + (EQUALS) LPAREN! anyArg RPAREN! | + (AND | OR) LPAREN! anyArg RPAREN!; + + +// functions that return Numbers +zeroArgNum : (LENGTH | TO_NUMBER | COUNT) LPAREN! RPAREN!; +oneArgNum : ((INDEX_OF | LAST_INDEX_OF) LPAREN! anyArg RPAREN!) | + (TO_DATE LPAREN! anyArg? RPAREN!) | + ((MOD | PLUS | MINUS | MULTIPLY | DIVIDE) LPAREN! anyArg RPAREN!); + +stringFunctionRef : zeroArgString | oneArgString | twoArgString; +booleanFunctionRef : zeroArgBool | oneArgBool; +numberFunctionRef : zeroArgNum | oneArgNum; + +anyArg : NUMBER | numberFunctionRef | STRING_LITERAL | zeroArgString | oneArgString | twoArgString | booleanLiteral | zeroArgBool | oneArgBool | expression; +stringArg : STRING_LITERAL | zeroArgString | oneArgString | twoArgString | expression; +functionRef : stringFunctionRef | booleanFunctionRef | numberFunctionRef; + + + +// Attribute Reference +subject : attrName | expression; +attrName : singleAttrName | multiAttrName; + +singleAttrRef : ATTRIBUTE_NAME | STRING_LITERAL; +singleAttrName : singleAttrRef -> + ^(ATTR_NAME singleAttrRef); + + +multiAttrFunction : ANY_ATTRIBUTE | ANY_MATCHING_ATTRIBUTE | ALL_ATTRIBUTES | ALL_MATCHING_ATTRIBUTES | ANY_DELINEATED_VALUE | ALL_DELINEATED_VALUES; +multiAttrName : multiAttrFunction LPAREN stringArg (COMMA stringArg)* RPAREN -> + ^(MULTI_ATTRIBUTE_REFERENCE multiAttrFunction stringArg*); + +attributeRef : subject -> + ^(ATTRIBUTE_REFERENCE subject); + + +functionCall : functionRef -> + ^(FUNCTION_CALL functionRef); + +booleanLiteral : TRUE | FALSE; +zeroArgStandaloneFunction : (IP | UUID | NOW | NEXT_INT | HOSTNAME) LPAREN! RPAREN!; +oneArgStandaloneFunction : (TO_LITERAL^ LPAREN! anyArg RPAREN!) | + (HOSTNAME^ LPAREN! booleanLiteral RPAREN!); +standaloneFunction : zeroArgStandaloneFunction | oneArgStandaloneFunction; + +attributeRefOrFunctionCall : (attributeRef | standaloneFunction); + +expression : DOLLAR LBRACE attributeRefOrFunctionCall (COLON functionCall)* RBRACE -> + ^(EXPRESSION attributeRefOrFunctionCall functionCall*); + +query : expression EOF -> + ^(QUERY expression); http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/output/AttributeExpressionLexer.tokens ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/output/AttributeExpressionLexer.tokens b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/output/AttributeExpressionLexer.tokens new file mode 100755 index 0000000..0265bfb --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/main/antlr3/org/apache/nifi/attribute/expression/language/antlr/output/AttributeExpressionLexer.tokens @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +ALL_ATTRIBUTES=4 +ALL_DELINEATED_VALUES=5 +ALL_MATCHING_ATTRIBUTES=6 +AND=7 +ANY_ATTRIBUTE=8 +ANY_DELINEATED_VALUE=9 +ANY_MATCHING_ATTRIBUTE=10 +APPEND=11 +ATTRIBUTE_NAME=12 +CEIL=13 +COLON=14 +COMMA=15 +CONTAINS=16 +DIVIDE=17 +DOLLAR=18 +DOT=19 +ENDS_WITH=20 +EQUALS=21 +EQUALS_IGNORE_CASE=22 +FALSE=23 +FIND=24 +FLOOR=25 +FORMAT=26 +GREATER_THAN=27 +GREATER_THAN_OR_EQUAL=28 +HOSTNAME=29 +INDEX_OF=30 +IP=31 +IS_NULL=32 +LAST_INDEX_OF=33 +LBRACE=34 +LENGTH=35 +LESS_THAN=36 +LESS_THAN_OR_EQUAL=37 +LPAREN=38 +MATCHES=39 +MINUS=40 +MOD=41 +MULTIPLY=42 +NEXT_INT=43 +NOT=44 +NOT_NULL=45 +NOW=46 +NUMBER=47 +OR=48 +PLUS=49 +PREPEND=50 +RBRACE=51 +REPLACE=52 +REPLACE_ALL=53 +REPLACE_NULL=54 +RPAREN=55 +SEMICOLON=56 +STARTS_WITH=57 +STRING_LITERAL=58 +SUBSTRING=59 +SUBSTRING_AFTER=60 +SUBSTRING_AFTER_LAST=61 +SUBSTRING_BEFORE=62 +SUBSTRING_BEFORE_LAST=63 +TO_DATE=64 +TO_LOWER=65 +TO_NUMBER=66 +TO_RADIX=67 +TO_STRING=68 +TO_UPPER=69 +TRIM=70 +TRUE=71 +URL_DECODE=72 +URL_ENCODE=73 +UUID=74 +WHITESPACE=75 http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java new file mode 100644 index 0000000..81da47e --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/EmptyPreparedQuery.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.attribute.expression.language; + +import java.util.Map; + +import org.apache.nifi.expression.AttributeValueDecorator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.exception.ProcessException; + +public class EmptyPreparedQuery implements PreparedQuery { + + private final String value; + + EmptyPreparedQuery(final String value) { + this.value = value; + } + + @Override + public String evaluateExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions() throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions(final AttributeValueDecorator decorator) throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions(final FlowFile flowFile) throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions(Map<String, String> attributes) throws ProcessException { + return value; + } + + @Override + public String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException { + return value; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java new file mode 100644 index 0000000..a29e792 --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/InvalidPreparedQuery.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.attribute.expression.language; + +import java.util.Map; + +import org.apache.nifi.attribute.expression.language.exception.AttributeExpressionLanguageException; +import org.apache.nifi.expression.AttributeValueDecorator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.exception.ProcessException; + +/** + * An implementation of PreparedQuery that throws an + * {@link AttributeExpressionLanguageException} when attempting to evaluate the + * query. This allows a PreparedQuery to be created, even though it can't be + * evaluated. + */ +public class InvalidPreparedQuery implements PreparedQuery { + + private final String query; + private final String explanation; + + public InvalidPreparedQuery(final String query, final String explanation) { + this.query = query; + this.explanation = explanation; + } + + @Override + public String evaluateExpressions(final FlowFile flowFile, final AttributeValueDecorator decorator) throws ProcessException { + throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); + } + + @Override + public String evaluateExpressions() throws ProcessException { + throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); + } + + @Override + public String evaluateExpressions(final AttributeValueDecorator decorator) throws ProcessException { + throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); + } + + @Override + public String evaluateExpressions(final FlowFile flowFile) throws ProcessException { + throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); + } + + @Override + public String evaluateExpressions(final Map<String, String> attributes) throws ProcessException { + throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); + } + + @Override + public String evaluateExpressions(final Map<String, String> attributes, final AttributeValueDecorator decorator) throws ProcessException { + throw new AttributeExpressionLanguageException("Invalid Expression: " + query + " due to " + explanation); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/aa998847/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java new file mode 100644 index 0000000..0d1b2c7 --- /dev/null +++ b/nifi-commons/nifi-expression-language/src/main/java/org/apache/nifi/attribute/expression/language/PreparedQuery.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.attribute.expression.language; + +import java.util.Map; + +import org.apache.nifi.expression.AttributeValueDecorator; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.exception.ProcessException; + +public interface PreparedQuery { + + String evaluateExpressions(FlowFile flowFile, AttributeValueDecorator decorator) throws ProcessException; + + String evaluateExpressions() throws ProcessException; + + String evaluateExpressions(AttributeValueDecorator decorator) throws ProcessException; + + String evaluateExpressions(FlowFile flowFile) throws ProcessException; + + String evaluateExpressions(Map<String, String> attributes) throws ProcessException; + + String evaluateExpressions(Map<String, String> attributes, AttributeValueDecorator decorator) throws ProcessException; + +}
