http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java new file mode 100644 index 0000000..9966c3d --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiArrayIndexPath.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.ArrayList; +import java.util.List; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.ArrayIndexFieldValue; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.NumericRange; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.type.ArrayDataType; + +public class MultiArrayIndexPath extends RecordPathSegment { + private final List<NumericRange> indices; + + MultiArrayIndexPath(final List<NumericRange> indices, final RecordPathSegment parent, final boolean absolute) { + super(indices.toString(), parent, absolute); + this.indices = indices; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> parentResult = getParentPath().evaluate(context); + + return parentResult + .filter(Filters.fieldTypeFilter(RecordFieldType.ARRAY)) + .flatMap(fieldValue -> { + final ArrayDataType arrayDataType = (ArrayDataType) fieldValue.getField().getDataType(); + final DataType elementDataType = arrayDataType.getElementType(); + final RecordField arrayField = new RecordField(fieldValue.getField().getFieldName(), elementDataType); + + final Object[] values = (Object[]) fieldValue.getValue(); + + return indices.stream() + .filter(range -> values.length > Math.abs(range.getMin())) + .flatMap(range -> { + final List<Object> valuesWithinRange = new ArrayList<>(); + + final int min = range.getMin() < 0 ? values.length + range.getMin() : range.getMin(); + final int max = range.getMax() < 0 ? values.length + range.getMax() : range.getMax(); + + for (int i = min; i <= max; i++) { + if (values.length > i) { + valuesWithinRange.add(values[i]); + } + } + + return IntStream.range(0, valuesWithinRange.size()) + .mapToObj(index -> { + final RecordField elementField = new RecordField(arrayField.getFieldName() + "[" + index + "]", elementDataType); + return new ArrayIndexFieldValue(valuesWithinRange.get(index), elementField, fieldValue, index); + }); + }); + + }); + } +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java new file mode 100644 index 0000000..e9b1874 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/MultiMapKeyPath.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.List; +import java.util.Map; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.MapEntryFieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.type.MapDataType; + +public class MultiMapKeyPath extends RecordPathSegment { + private final List<String> mapKeys; + + MultiMapKeyPath(final List<String> mapKeys, final RecordPathSegment parent, final boolean absolute) { + super(mapKeys.toString(), parent, absolute); + this.mapKeys = mapKeys; + } + + @Override + @SuppressWarnings("unchecked") + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> parentResult = getParentPath().evaluate(context); + + return parentResult + .filter(Filters.fieldTypeFilter(RecordFieldType.MAP)) + .flatMap(fieldValue -> { + final Map<String, ?> map = (Map<String, ?>) fieldValue.getValue(); + return mapKeys.stream().map(key -> { + final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType(); + final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + key + "']", valueType); + return new MapEntryFieldValue(map.get(key), elementField, fieldValue, key); + }); + }); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ParentPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ParentPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ParentPath.java new file mode 100644 index 0000000..bd5bf97 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/ParentPath.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.util.Filters; + + +public class ParentPath extends RecordPathSegment { + + ParentPath(final RecordPathSegment parent, final boolean absolute) { + super("..", parent, absolute); + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> stream; + final RecordPathSegment parentPath = getParentPath(); + if (parentPath == null) { + stream = Stream.of(context.getContextNode()); + } else { + stream = parentPath.evaluate(context); + } + + return Filters.presentValues(stream.map(fieldVal -> fieldVal.getParent())); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java new file mode 100644 index 0000000..759a848 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/PredicatePath.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.filter.RecordPathFilter; + +public class PredicatePath extends RecordPathSegment { + private final RecordPathFilter filter; + + public PredicatePath(final RecordPathSegment parent, final RecordPathFilter filter, final boolean absolute) { + super("[" + filter + "]", parent, absolute); + this.filter = filter; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> valueStream = getParentPath().evaluate(context); + + return valueStream.flatMap(fieldVal -> { + // For the duration of this Predicate, we want to consider the 'context node' to be + // whatever value is given to us in the field value. We then want to return the 'context node' + // back to what it was before this Predicate. + final FieldValue previousContextNode = context.getContextNode(); + context.setContextNode(fieldVal); + try { + // Really what we want to do is filter out Stream<FieldValue> but that becomes very difficult + // to implement for the RecordPathFilter's. So, instead, we pass the FieldValue to field and + // the RecordPathEvaluationContext and receive back a Stream<FieldValue>. Since this is a Predicate, + // though, we don't want to transform our Stream - we just want to filter it. So we handle this by + // mapping the result back to fieldVal. And since this predicate shouldn't return the same field multiple + // times, we will limit the stream to 1 element. We also filter out any FieldValue whose value is null. + // This is done because if we have a predicate like [./iDoNotExist != 'hello'] then the relative path will + // return a value of null and that will be compared to 'hello'. Since they are not equal, the NotEqualsFilter + // will return 'true', so we will get back a FieldValue with a null value. This should not make the Predicate + // true. + return filter.filter(fieldVal, context) + .filter(fv -> fv.getValue() != null) + .limit(1) + .map(ignore -> fieldVal); + } finally { + context.setContextNode(previousContextNode); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java new file mode 100644 index 0000000..24b872a --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathCompiler.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import static org.apache.nifi.record.path.RecordPathParser.ARRAY_INDEX; +import static org.apache.nifi.record.path.RecordPathParser.CHILD_REFERENCE; +import static org.apache.nifi.record.path.RecordPathParser.CURRENT_FIELD; +import static org.apache.nifi.record.path.RecordPathParser.DESCENDANT_REFERENCE; +import static org.apache.nifi.record.path.RecordPathParser.EQUAL; +import static org.apache.nifi.record.path.RecordPathParser.FIELD_NAME; +import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN; +import static org.apache.nifi.record.path.RecordPathParser.GREATER_THAN_EQUAL; +import static org.apache.nifi.record.path.RecordPathParser.LESS_THAN; +import static org.apache.nifi.record.path.RecordPathParser.LESS_THAN_EQUAL; +import static org.apache.nifi.record.path.RecordPathParser.MAP_KEY; +import static org.apache.nifi.record.path.RecordPathParser.NOT_EQUAL; +import static org.apache.nifi.record.path.RecordPathParser.NUMBER; +import static org.apache.nifi.record.path.RecordPathParser.NUMBER_LIST; +import static org.apache.nifi.record.path.RecordPathParser.NUMBER_RANGE; +import static org.apache.nifi.record.path.RecordPathParser.PARENT_REFERENCE; +import static org.apache.nifi.record.path.RecordPathParser.PATH; +import static org.apache.nifi.record.path.RecordPathParser.PREDICATE; +import static org.apache.nifi.record.path.RecordPathParser.RELATIVE_PATH; +import static org.apache.nifi.record.path.RecordPathParser.ROOT_REFERENCE; +import static org.apache.nifi.record.path.RecordPathParser.STRING_LIST; +import static org.apache.nifi.record.path.RecordPathParser.STRING_LITERAL; +import static org.apache.nifi.record.path.RecordPathParser.WILDCARD; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiFunction; + +import org.antlr.runtime.tree.Tree; +import org.apache.nifi.record.path.NumericRange; +import org.apache.nifi.record.path.exception.RecordPathException; +import org.apache.nifi.record.path.filter.EqualsFilter; +import org.apache.nifi.record.path.filter.GreaterThanFilter; +import org.apache.nifi.record.path.filter.GreaterThanOrEqualFilter; +import org.apache.nifi.record.path.filter.LessThanFilter; +import org.apache.nifi.record.path.filter.LessThanOrEqualFilter; +import org.apache.nifi.record.path.filter.NotEqualsFilter; +import org.apache.nifi.record.path.filter.RecordPathFilter; + +public class RecordPathCompiler { + + public static RecordPathSegment compile(final Tree pathTree, final RecordPathSegment root, final boolean absolute) { + RecordPathSegment parent = root; + for (int i = 0; i < pathTree.getChildCount(); i++) { + final Tree child = pathTree.getChild(i); + parent = RecordPathCompiler.buildPath(child, parent, absolute); + } + + return parent; + } + + public static RecordPathSegment buildPath(final Tree tree, final RecordPathSegment parent, final boolean absolute) { + switch (tree.getType()) { + case ROOT_REFERENCE: { + return new RootPath(); + } + case CHILD_REFERENCE: { + final Tree childTree = tree.getChild(0); + final int childTreeType = childTree.getType(); + if (childTreeType == FIELD_NAME) { + final String childName = childTree.getChild(0).getText(); + return new ChildFieldPath(childName, parent, absolute); + } else if (childTreeType == WILDCARD) { + return new WildcardChildPath(parent, absolute); + } else { + throw new RecordPathException("Expected field name following '/' Token but found " + childTree); + } + } + case ARRAY_INDEX: { + final Tree indexListTree = tree.getChild(0); + if (indexListTree.getType() == NUMBER_LIST) { + if (indexListTree.getChildCount() == 1 && indexListTree.getChild(0).getType() == NUMBER) { + final Tree indexTree = indexListTree.getChild(0); + final int index = Integer.parseInt(indexTree.getText()); + return new ArrayIndexPath(index, parent, absolute); + } + + final List<NumericRange> indexList = new ArrayList<>(); + + for (int i = 0; i < indexListTree.getChildCount(); i++) { + final Tree indexTree = indexListTree.getChild(i); + if (indexTree.getType() == NUMBER) { + final int index = Integer.valueOf(indexTree.getText()); + indexList.add(new NumericRange(index, index)); + } else if (indexTree.getType() == NUMBER_RANGE) { + final int min = Integer.valueOf(indexTree.getChild(0).getText()); + final int max = Integer.valueOf(indexTree.getChild(1).getText()); + indexList.add(new NumericRange(min, max)); + } else { + throw new RecordPathException("Expected Number or Range following '[' Token but found " + indexTree); + } + } + + return new MultiArrayIndexPath(indexList, parent, absolute); + } else { + throw new RecordPathException("Expected Number or Range following '[' Token but found " + indexListTree); + } + } + case MAP_KEY: { + final Tree keyTree = tree.getChild(0); + if (keyTree.getType() == STRING_LIST) { + if (keyTree.getChildCount() == 1) { + return new SingularMapKeyPath(keyTree.getChild(0).getText(), parent, absolute); + } + + final List<String> keys = new ArrayList<>(keyTree.getChildCount()); + for (int i = 0; i < keyTree.getChildCount(); i++) { + keys.add(keyTree.getChild(i).getText()); + } + + return new MultiMapKeyPath(keys, parent, absolute); + } else { + throw new RecordPathException("Expected Map Key following '[' Token but found " + keyTree); + } + } + case WILDCARD: { + return new WildcardIndexPath(parent, absolute); + } + case DESCENDANT_REFERENCE: { + final Tree childTree = tree.getChild(0); + final int childTreeType = childTree.getType(); + if (childTreeType == FIELD_NAME) { + final String descendantName = childTree.getChild(0).getText(); + return new DescendantFieldPath(descendantName, parent, absolute); + } else { + throw new RecordPathException("Expected field name following '//' Token but found " + childTree); + } + } + case PARENT_REFERENCE: { + return new ParentPath(parent, absolute); + } + case CURRENT_FIELD: { + return new CurrentFieldPath(parent, absolute); + } + case STRING_LITERAL: { + return new LiteralValuePath(parent, tree.getText(), absolute); + } + case NUMBER: { + return new LiteralValuePath(parent, Integer.parseInt(tree.getText()), absolute); + } + case PREDICATE: { + final Tree operatorTree = tree.getChild(0); + final RecordPathFilter filter = createFilter(operatorTree, parent, absolute); + return new PredicatePath(parent, filter, absolute); + } + case RELATIVE_PATH: { + return compile(tree, parent, absolute); + } + case PATH: { + return compile(tree, new RootPath(), absolute); + } + } + + throw new RecordPathException("Encountered unexpected token " + tree); + } + + private static RecordPathFilter createFilter(final Tree operatorTree, final RecordPathSegment parent, final boolean absolute) { + switch (operatorTree.getType()) { + case EQUAL: + return createBinaryOperationFilter(operatorTree, parent, EqualsFilter::new, absolute); + case NOT_EQUAL: + return createBinaryOperationFilter(operatorTree, parent, NotEqualsFilter::new, absolute); + case LESS_THAN: + return createBinaryOperationFilter(operatorTree, parent, LessThanFilter::new, absolute); + case LESS_THAN_EQUAL: + return createBinaryOperationFilter(operatorTree, parent, LessThanOrEqualFilter::new, absolute); + case GREATER_THAN: + return createBinaryOperationFilter(operatorTree, parent, GreaterThanFilter::new, absolute); + case GREATER_THAN_EQUAL: + return createBinaryOperationFilter(operatorTree, parent, GreaterThanOrEqualFilter::new, absolute); + default: + throw new RecordPathException("Expected an Expression of form <value> <operator> <value> to follow '[' Token but found " + operatorTree); + } + } + + private static RecordPathFilter createBinaryOperationFilter(final Tree operatorTree, final RecordPathSegment parent, + final BiFunction<RecordPathSegment, RecordPathSegment, RecordPathFilter> function, final boolean absolute) { + final Tree lhsTree = operatorTree.getChild(0); + final Tree rhsTree = operatorTree.getChild(1); + final RecordPathSegment lhsPath = buildPath(lhsTree, parent, absolute); + final RecordPathSegment rhsPath = buildPath(rhsTree, parent, absolute); + return function.apply(lhsPath, rhsPath); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java new file mode 100644 index 0000000..92ff010 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RecordPathSegment.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.Objects; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.record.path.StandardRecordPathEvaluationContext; +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.Record; + +public abstract class RecordPathSegment implements RecordPath { + private final String path; + private final RecordPathSegment parentPath; + private final boolean absolute; + + RecordPathSegment(final String path, final RecordPathSegment parentPath, final boolean absolute) { + this.path = path; + this.parentPath = parentPath; + this.absolute = absolute; + } + + @Override + public String getPath() { + return path; + } + + RecordPathSegment getParentPath() { + return parentPath; + } + + @Override + public String toString() { + return getPath(); + } + + @Override + public boolean isAbsolute() { + return absolute; + } + + @Override + public int hashCode() { + return Objects.hash(path); + } + + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } + if (obj == null) { + return false; + } + if (!(obj instanceof RecordPath)) { + return false; + } + + final RecordPath other = (RecordPath) obj; + return getPath().equals(other.getPath()); + } + + @Override + public final RecordPathResult evaluate(final Record record) { + final RecordPathEvaluationContext context = new StandardRecordPathEvaluationContext(record); + final Stream<FieldValue> selectedFields = evaluate(context); + + return new RecordPathResult() { + @Override + public String getPath() { + return RecordPathSegment.this.getPath(); + } + + @Override + public Stream<FieldValue> getSelectedFields() { + return selectedFields; + } + }; + } + + @Override + public final RecordPathResult evaluate(final FieldValue contextNode) { + final RecordPathEvaluationContext context; + if (Filters.isRecord(contextNode.getField().getDataType(), contextNode.getValue())) { + final Record record = (Record) contextNode.getValue(); + if (record == null) { + return new RecordPathResult() { + @Override + public String getPath() { + return RecordPathSegment.this.getPath(); + } + + @Override + public Stream<FieldValue> getSelectedFields() { + return Stream.empty(); + } + }; + } + + context = new StandardRecordPathEvaluationContext(record); + } else { + final FieldValue parent = contextNode.getParent().orElse(null); + if (parent == null) { + context = new StandardRecordPathEvaluationContext(null); + } else { + context = new StandardRecordPathEvaluationContext(parent.getParentRecord().orElse(null)); + } + } + + context.setContextNode(contextNode); + final Stream<FieldValue> selectedFields = evaluate(context); + + return new RecordPathResult() { + @Override + public String getPath() { + return RecordPathSegment.this.getPath(); + } + + @Override + public Stream<FieldValue> getSelectedFields() { + return selectedFields; + } + }; + } + + public abstract Stream<FieldValue> evaluate(RecordPathEvaluationContext context); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RootPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RootPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RootPath.java new file mode 100644 index 0000000..6ec205e --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/RootPath.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; + +public class RootPath extends RecordPathSegment { + private static final String PATH = ""; + + public RootPath() { + super(PATH, null, true); + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final RecordField field = new RecordField("root", RecordFieldType.RECORD.getRecordDataType(context.getRecord().getSchema())); + final FieldValue fieldValue = new StandardFieldValue(context.getRecord(), field, null); + return Stream.of(fieldValue); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java new file mode 100644 index 0000000..ee57f36 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/SingularMapKeyPath.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.Map; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.MapEntryFieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.type.MapDataType; + +public class SingularMapKeyPath extends RecordPathSegment { + private final String mapKey; + + SingularMapKeyPath(final String mapKey, final RecordPathSegment parent, final boolean absolute) { + super("[" + mapKey + "]", parent, absolute); + this.mapKey = mapKey; + } + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> parentResult = getParentPath().evaluate(context); + + return parentResult + .filter(Filters.fieldTypeFilter(RecordFieldType.MAP)) + .map(fieldValue -> { + final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType(); + final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + mapKey + "']", valueType); + return new MapEntryFieldValue(getMapValue(fieldValue), elementField, fieldValue, mapKey); + }); + } + + private Object getMapValue(final FieldValue fieldValue) { + final Map<?, ?> map = (Map<?, ?>) fieldValue.getValue(); + return map.get(mapKey); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardChildPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardChildPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardChildPath.java new file mode 100644 index 0000000..16513f9 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardChildPath.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.Optional; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.StandardFieldValue; +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.Record; + +public class WildcardChildPath extends RecordPathSegment { + + WildcardChildPath(final RecordPathSegment parent, final boolean absolute) { + super("/*", parent, absolute); + } + + + @Override + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + return getParentPath().evaluate(context) + // map to Optional<FieldValue> containing child element + .flatMap(fieldVal -> getChildren(fieldVal)); + } + + private Stream<FieldValue> getChildren(final FieldValue fieldValue) { + if (fieldValue == null || fieldValue.getValue() == null || !Filters.isRecord(fieldValue)) { + return Stream.empty(); + } + + final Record record = (Record) fieldValue.getValue(); + return Filters.presentValues(record.getSchema().getFields().stream() + .map(field -> { + final Object value = record.getValue(field); + if (value == null) { + return Optional.empty(); + } + + return Optional.of(new StandardFieldValue(value, field, fieldValue)); + })); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java new file mode 100644 index 0000000..c2ce474 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/paths/WildcardIndexPath.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.paths; + +import java.util.Map; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.ArrayIndexFieldValue; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.MapEntryFieldValue; +import org.apache.nifi.record.path.RecordPathEvaluationContext; +import org.apache.nifi.record.path.util.Filters; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.MapDataType; + +public class WildcardIndexPath extends RecordPathSegment { + + WildcardIndexPath(final RecordPathSegment parent, final boolean absolute) { + super("[*]", parent, absolute); + } + + + @Override + @SuppressWarnings("unchecked") + public Stream<FieldValue> evaluate(final RecordPathEvaluationContext context) { + final Stream<FieldValue> parentResult = getParentPath().evaluate(context); + + return parentResult + .filter(Filters.fieldTypeFilter(RecordFieldType.MAP, RecordFieldType.ARRAY)) + .flatMap(fieldValue -> { + final RecordFieldType fieldType = fieldValue.getField().getDataType().getFieldType(); + + final Object value = fieldValue.getValue(); + if (value == null) { + return Stream.empty(); + } + + if (fieldType == RecordFieldType.MAP) { + final Map<String, ?> map = (Map<String, ?>) value; + return map.entrySet().stream() + .map(entry -> { + final DataType valueType = ((MapDataType) fieldValue.getField().getDataType()).getValueType(); + final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "['" + entry.getKey() + "']", valueType); + return new MapEntryFieldValue(entry.getValue(), elementField, fieldValue, entry.getKey()); + }); + } else { + final Object[] array = (Object[]) value; + return IntStream.range(0, array.length) + .mapToObj(index -> { + final DataType elementDataType = ((ArrayDataType) fieldValue.getField().getDataType()).getElementType(); + final RecordField elementField = new RecordField(fieldValue.getField().getFieldName() + "[" + index + "]", elementDataType); + return new ArrayIndexFieldValue(array[index], elementField, fieldValue, index); + }); + } + }); + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/Filters.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/Filters.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/Filters.java new file mode 100644 index 0000000..4800d59 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/Filters.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.util; + +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; + +public class Filters { + + public static Predicate<FieldValue> fieldTypeFilter(final RecordFieldType fieldType, final RecordFieldType... alternativeTypes) { + return fieldVal -> { + final RecordFieldType recordFieldType = fieldVal.getField().getDataType().getFieldType(); + if (recordFieldType == fieldType) { + return true; + } + + for (final RecordFieldType alternate : alternativeTypes) { + if (recordFieldType == alternate) { + return true; + } + } + + return false; + }; + } + + public static <T> Stream<T> presentValues(final Stream<Optional<T>> stream) { + return stream.filter(opt -> opt.isPresent()) + .map(opt -> opt.get()); + } + + public static boolean isRecord(final FieldValue fieldValue) { + final DataType dataType = fieldValue.getField().getDataType(); + final Object value = fieldValue.getValue(); + return isRecord(dataType, value); + } + + public static boolean isRecord(final DataType dataType, final Object value) { + if (dataType.getFieldType() == RecordFieldType.RECORD) { + return true; + } + + if (value == null) { + return false; + } + + if (value instanceof Record) { + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java new file mode 100644 index 0000000..243ad11 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/util/RecordPathCache.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.util; + +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.nifi.record.path.RecordPath; + +public class RecordPathCache { + private final Map<String, RecordPath> compiledRecordPaths; + + public RecordPathCache(final int cacheSize) { + compiledRecordPaths = new LinkedHashMap<String, RecordPath>() { + @Override + protected boolean removeEldestEntry(final Map.Entry<String, RecordPath> eldest) { + return size() >= cacheSize; + } + }; + } + + public RecordPath getCompiled(final String path) { + RecordPath compiled; + synchronized (this) { + compiled = compiledRecordPaths.get(path); + } + + if (compiled != null) { + return compiled; + } + + compiled = RecordPath.compile(path); + + synchronized (this) { + final RecordPath existing = compiledRecordPaths.putIfAbsent(path, compiled); + if (existing != null) { + compiled = existing; + } + } + + return compiled; + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathPropertyNameValidator.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathPropertyNameValidator.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathPropertyNameValidator.java new file mode 100644 index 0000000..0a53335 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathPropertyNameValidator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.validation; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.exception.RecordPathException; + +public class RecordPathPropertyNameValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + try { + RecordPath.compile(subject); + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .explanation("Valid RecordPath") + .build(); + } catch (final RecordPathException e) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Property Name is not a valid RecordPath value: " + e.getMessage()) + .build(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathValidator.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathValidator.java b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathValidator.java new file mode 100644 index 0000000..ef5e599 --- /dev/null +++ b/nifi-commons/nifi-record-path/src/main/java/org/apache/nifi/record/path/validation/RecordPathValidator.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.record.path.validation; + +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.exception.RecordPathException; + +public class RecordPathValidator implements Validator { + + @Override + public ValidationResult validate(final String subject, final String input, final ValidationContext context) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .explanation("Property uses Expression Language so no further validation is possible") + .build(); + } + + try { + RecordPath.compile(input); + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(true) + .explanation("Valid RecordPath") + .build(); + } catch (final RecordPathException e) { + return new ValidationResult.Builder() + .input(input) + .subject(subject) + .valid(false) + .explanation("Property Value is not a valid RecordPath value: " + e.getMessage()) + .build(); + } + } + +}
