This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e964fb NIFI-4792: Add support for querying array fields in
QueryRecord
5e964fb is described below
commit 5e964fbc474034cb5342e22aa290f1040af377ea
Author: David Savage <[email protected]>
AuthorDate: Sat Jan 25 17:27:47 2020 +0000
NIFI-4792: Add support for querying array fields in QueryRecord
Work in progress adding support for array based queries
updated calcite dependency
tidy up unused imports highlighted by checkstyle in travis build
tidy up }s highlighted by checkstyle in travis build
Add test for use case referenced in NIFI-4792
Bumped Calcite version to 1.21.0
Signed-off-by: Matthew Burgess <[email protected]>
This closes #4015
---
.../nifi/processors/standard/QueryRecord.java | 36 ++++-
.../nifi/queryrecord/FlowFileEnumerator.java | 19 ++-
.../org/apache/nifi/queryrecord/FlowFileTable.java | 5 +-
.../nifi/processors/standard/TestQueryRecord.java | 158 +++++++++++++++++++++
nifi-nar-bundles/nifi-standard-bundle/pom.xml | 2 +-
5 files changed, 214 insertions(+), 6 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 5136e67..82aea6f 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -610,14 +610,15 @@ public class QueryRecord extends AbstractProcessor {
if (record == null) {
return null;
}
-
if (record instanceof Record) {
return eval((Record) record, recordPath);
}
if (record instanceof Record[]) {
return eval((Record[]) record, recordPath);
}
-
+ if (record instanceof Iterable) {
+ return eval((Iterable<Record>) record, recordPath);
+ }
if (record instanceof Map) {
return eval((Map<?, ?>) record, recordPath);
}
@@ -645,6 +646,18 @@ public class QueryRecord extends AbstractProcessor {
return evalResults(selectedFields);
}
+ private Object eval(final Iterable<Record> records, final String
recordPath) {
+ final RecordPath compiled =
RECORD_PATH_CACHE.getCompiled(recordPath);
+
+ final List<FieldValue> selectedFields = new ArrayList<>();
+ for (final Record record : records) {
+ final RecordPathResult result = compiled.evaluate(record);
+ result.getSelectedFields().forEach(selectedFields::add);
+ }
+
+ return evalResults(selectedFields);
+ }
+
private Object eval(final Record[] records, final String recordPath) {
final RecordPath compiled =
RECORD_PATH_CACHE.getCompiled(recordPath);
@@ -794,6 +807,8 @@ public class QueryRecord extends AbstractProcessor {
return eval((Record) record, recordPath, transform);
} else if (record instanceof Record[]) {
return eval((Record[]) record, recordPath, transform);
+ } else if (record instanceof Iterable) {
+ return eval((Iterable<Record>) record, recordPath, transform);
} else if (record instanceof Map) {
return eval((Map<?, ?>) record, recordPath, transform);
}
@@ -837,6 +852,23 @@ public class QueryRecord extends AbstractProcessor {
return evalResults(selectedFields.stream(), transform, () ->
"RecordPath " + recordPath + " resulted in more than one return value. The
RecordPath must be further constrained.");
}
+ private <T> T eval(final Iterable<Record> records, final String
recordPath, final Function<Object, T> transform) {
+ final RecordPath compiled =
RECORD_PATH_CACHE.getCompiled(recordPath);
+
+ final List<FieldValue> selectedFields = new ArrayList<>();
+ for (final Record record : records) {
+ final RecordPathResult result = compiled.evaluate(record);
+ result.getSelectedFields().forEach(selectedFields::add);
+ }
+
+ if (selectedFields.isEmpty()) {
+ return null;
+ }
+
+ return evalResults(selectedFields.stream(), transform, () ->
"RecordPath " + recordPath + " resulted in more than one return value. The
RecordPath must be further constrained.");
+ }
+
+
private <T> T evalResults(final Stream<FieldValue> fields, final
Function<Object, T> transform, final Supplier<String>
multipleReturnValueErrorSupplier) {
return fields.map(FieldValue::getValue)
.filter(Objects::nonNull)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
index 06ffb76..e4814ec 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
@@ -27,6 +27,9 @@ import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.Record;
import java.io.InputStream;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
public class FlowFileEnumerator implements Enumerator<Object> {
private final ProcessSession session;
@@ -111,12 +114,26 @@ public class FlowFileEnumerator implements
Enumerator<Object> {
final Object[] filtered = new Object[fields.length];
for (int i = 0; i < fields.length; i++) {
final int indexToKeep = fields[i];
- filtered[i] = row[indexToKeep];
+ filtered[i] = cast(row[indexToKeep]);
}
return filtered;
}
+ private Object cast(Object o) {
+ if (o == null) {
+ return null;
+ } else if (o.getClass().isArray()) {
+ List<Object> l = new ArrayList(Array.getLength(o));
+ for (int i = 0; i < Array.getLength(o); i++) {
+ l.add(Array.get(o, i));
+ }
+ return l;
+ } else {
+ return o;
+ }
+ }
+
@Override
public void reset() {
if (rawIn != null) {
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index 8c0e2ce..3030008 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -42,6 +42,7 @@ import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
import java.lang.reflect.Type;
import java.math.BigInteger;
@@ -51,7 +52,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-
public class FlowFileTable extends AbstractTable implements QueryableTable,
TranslatableTable {
private final RecordReaderFactory recordReaderFactory;
@@ -214,7 +214,8 @@ public class FlowFileTable extends AbstractTable implements
QueryableTable, Tran
case STRING:
return typeFactory.createJavaType(String.class);
case ARRAY:
- return typeFactory.createJavaType(Object[].class);
+ ArrayDataType array = (ArrayDataType) fieldType;
+ return
typeFactory.createArrayType(getRelDataType(array.getElementType(),
typeFactory), -1);
case RECORD:
return typeFactory.createJavaType(Record.class);
case MAP:
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 5b17327..0c26980 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
@@ -213,6 +214,132 @@ public class TestQueryRecord {
}
@Test
+ public void testCollectionFunctionsWithArray() throws
InitializationException {
+ final Record record = createHierarchicalArrayRecord();
+ final ArrayListRecordReader recordReader = new
ArrayListRecordReader(record.getSchema());
+ recordReader.addRecord(record);
+
+ final ArrayListRecordWriter writer = new
ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT title, name" +
+ " FROM FLOWFILE" +
+ " WHERE CARDINALITY(addresses) > 1");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Record output = written.get(0);
+ assertEquals("John Doe", output.getValue("name"));
+ assertEquals("Software Engineer", output.getValue("title"));
+ }
+
+ @Test
+ public void testCollectionFunctionsWithWhereClause() throws
InitializationException {
+ final Record sample = createTaggedRecord("1", "a", "b", "c");
+
+ final ArrayListRecordReader recordReader = new
ArrayListRecordReader(sample.getSchema());
+ recordReader.addRecord(createTaggedRecord("1", "a", "d", "g"));
+ recordReader.addRecord(createTaggedRecord("2", "b", "e"));
+ recordReader.addRecord(createTaggedRecord("3", "c", "f", "h"));
+
+
+ final ArrayListRecordWriter writer = new
ArrayListRecordWriter(sample.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT id, tags FROM FLOWFILE CROSS JOIN
UNNEST(FLOWFILE.tags) AS f(tag) WHERE tag IN ('a','b')");
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(2, written.size());
+
+ final Record output0 = written.get(0);
+ assertEquals("1", output0.getValue("id"));
+ assertArrayEquals(new Object[]{"a", "d", "g"}, (Object[])
output0.getValue("tags"));
+
+ final Record output1 = written.get(1);
+ assertEquals("2", output1.getValue("id"));
+ assertArrayEquals(new Object[]{"b", "e"}, (Object[])
output1.getValue("tags"));
+ }
+
+
+ @Test
+ public void testArrayColumnWithIndex() throws InitializationException {
+ final Record sample = createTaggedRecord("1", "a", "b", "c");
+
+ final ArrayListRecordReader recordReader = new
ArrayListRecordReader(sample.getSchema());
+ recordReader.addRecord(createTaggedRecord("1", "a", "d", "g"));
+ recordReader.addRecord(createTaggedRecord("2", "b", "e"));
+ recordReader.addRecord(createTaggedRecord("3", "c", "f", "h"));
+
+
+ final ArrayListRecordWriter writer = new
ArrayListRecordWriter(sample.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT id, tags[1] as first, tags[2] as \"second\",
tags[CARDINALITY(tags)] as last FROM FLOWFILE");
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(3, written.size());
+
+ final Record output0 = written.get(0);
+ assertEquals("1", output0.getValue("id"));
+ assertEquals("a", output0.getValue("first"));
+ assertEquals("d", output0.getValue("second"));
+ assertEquals("g", output0.getValue("last"));
+
+ final Record output1 = written.get(1);
+ assertEquals("2", output1.getValue("id"));
+ assertEquals("b", output1.getValue("first"));
+ assertEquals("e", output1.getValue("second"));
+ assertEquals("e", output1.getValue("last"));
+
+ final Record output2 = written.get(2);
+ assertEquals("3", output2.getValue("id"));
+ assertEquals("c", output2.getValue("first"));
+ assertEquals("f", output2.getValue("second"));
+ assertEquals("h", output2.getValue("last"));
+ }
+
+ @Test
public void testCompareResultsOfTwoRecordPathsAgainstArray() throws
InitializationException {
final Record record = createHierarchicalArrayRecord();
@@ -382,6 +509,12 @@ public class TestQueryRecord {
* "mother": {
* "name": "Jane Doe"
* }
+ * },
+ * "favouriteThings": {
+ * "sport": "basketball",
+ * "color": "green",
+ * "roses": "raindrops",
+ * "kittens": "whiskers"
* }
* }
* </pre></code>
@@ -441,6 +574,31 @@ public class TestQueryRecord {
/**
* Returns a Record that, if written in JSON, would look like:
* <code><pre>
+ * {
+ * "id": >id<,
+ * "tags": [>tag1<,>tag2<...]
+ * }
+ * </pre></code>
+ *
+ * @return the Record
+ */
+ private Record createTaggedRecord(String id, String...tags) {
+ final List<RecordField> recordSchemaFields = new ArrayList<>();
+ recordSchemaFields.add(new RecordField("id",
RecordFieldType.STRING.getDataType()));
+ recordSchemaFields.add(new RecordField("tags",
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
+ final RecordSchema recordSchema = new
SimpleRecordSchema(recordSchemaFields);
+
+ final Map<String, Object> map = new HashMap<>();
+ map.put("id", id);
+ map.put("tags", Arrays.asList(tags));
+
+ final Record record = new MapRecord(recordSchema, map);
+ return record;
+ }
+
+ /**
+ * Returns a Record that, if written in JSON, would look like:
+ * <code><pre>
* {
* "name": "John Doe",
* "title": "Software Engineer",
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index c110d60..f87d71c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -372,7 +372,7 @@
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
- <version>1.17.0</version>
+ <version>1.21.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>