This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e964fb  NIFI-4792: Add support for querying array fields in 
QueryRecord
5e964fb is described below

commit 5e964fbc474034cb5342e22aa290f1040af377ea
Author: David Savage <[email protected]>
AuthorDate: Sat Jan 25 17:27:47 2020 +0000

    NIFI-4792: Add support for querying array fields in QueryRecord
    
    Work in progress adding support for array based queries
    updated calcite dependency
    
    tidy up unused imports highlighted by checkstyle in travis build
    
    tidy up }s highlighted by checkstyle in travis build
    
    Add test for use case referenced in NIFI-4792
    
    Bumped Calcite version to 1.21.0
    
    Signed-off-by: Matthew Burgess <[email protected]>
    
    This closes #4015
---
 .../nifi/processors/standard/QueryRecord.java      |  36 ++++-
 .../nifi/queryrecord/FlowFileEnumerator.java       |  19 ++-
 .../org/apache/nifi/queryrecord/FlowFileTable.java |   5 +-
 .../nifi/processors/standard/TestQueryRecord.java  | 158 +++++++++++++++++++++
 nifi-nar-bundles/nifi-standard-bundle/pom.xml      |   2 +-
 5 files changed, 214 insertions(+), 6 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
index 5136e67..82aea6f 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
@@ -610,14 +610,15 @@ public class QueryRecord extends AbstractProcessor {
             if (record == null) {
                 return null;
             }
-
             if (record instanceof Record) {
                 return eval((Record) record, recordPath);
             }
             if (record instanceof Record[]) {
                 return eval((Record[]) record, recordPath);
             }
-
+            if (record instanceof Iterable) {
+                return eval((Iterable<Record>) record, recordPath);
+            }
             if (record instanceof Map) {
                 return eval((Map<?, ?>) record, recordPath);
             }
@@ -645,6 +646,18 @@ public class QueryRecord extends AbstractProcessor {
             return evalResults(selectedFields);
         }
 
+        private Object eval(final Iterable<Record> records, final String 
recordPath) {
+            final RecordPath compiled = 
RECORD_PATH_CACHE.getCompiled(recordPath);
+
+            final List<FieldValue> selectedFields = new ArrayList<>();
+            for (final Record record : records) {
+                final RecordPathResult result = compiled.evaluate(record);
+                result.getSelectedFields().forEach(selectedFields::add);
+            }
+
+            return evalResults(selectedFields);
+        }
+
         private Object eval(final Record[] records, final String recordPath) {
             final RecordPath compiled = 
RECORD_PATH_CACHE.getCompiled(recordPath);
 
@@ -794,6 +807,8 @@ public class QueryRecord extends AbstractProcessor {
                 return eval((Record) record, recordPath, transform);
             } else if (record instanceof Record[]) {
                 return eval((Record[]) record, recordPath, transform);
+            } else if (record instanceof Iterable) {
+                return eval((Iterable<Record>) record, recordPath, transform);
             } else if (record instanceof Map) {
                 return eval((Map<?, ?>) record, recordPath, transform);
             }
@@ -837,6 +852,23 @@ public class QueryRecord extends AbstractProcessor {
             return evalResults(selectedFields.stream(), transform, () -> 
"RecordPath " + recordPath + " resulted in more than one return value. The 
RecordPath must be further constrained.");
         }
 
+        private <T> T  eval(final Iterable<Record> records, final String 
recordPath, final Function<Object, T> transform) {
+            final RecordPath compiled = 
RECORD_PATH_CACHE.getCompiled(recordPath);
+
+            final List<FieldValue> selectedFields = new ArrayList<>();
+            for (final Record record : records) {
+                final RecordPathResult result = compiled.evaluate(record);
+                result.getSelectedFields().forEach(selectedFields::add);
+            }
+
+            if (selectedFields.isEmpty()) {
+                return null;
+            }
+
+            return evalResults(selectedFields.stream(), transform, () -> 
"RecordPath " + recordPath + " resulted in more than one return value. The 
RecordPath must be further constrained.");
+        }
+
+
         private <T> T evalResults(final Stream<FieldValue> fields, final 
Function<Object, T> transform, final Supplier<String> 
multipleReturnValueErrorSupplier) {
             return fields.map(FieldValue::getValue)
                 .filter(Objects::nonNull)
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
index 06ffb76..e4814ec 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
@@ -27,6 +27,9 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 
 import java.io.InputStream;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.List;
 
 public class FlowFileEnumerator implements Enumerator<Object> {
     private final ProcessSession session;
@@ -111,12 +114,26 @@ public class FlowFileEnumerator implements 
Enumerator<Object> {
         final Object[] filtered = new Object[fields.length];
         for (int i = 0; i < fields.length; i++) {
             final int indexToKeep = fields[i];
-            filtered[i] = row[indexToKeep];
+            filtered[i] = cast(row[indexToKeep]);
         }
 
         return filtered;
     }
 
+    private Object cast(Object o) {
+        if (o == null) {
+            return null;
+        } else if (o.getClass().isArray()) {
+            List<Object> l = new ArrayList(Array.getLength(o));
+            for (int i = 0; i < Array.getLength(o); i++) {
+                l.add(Array.get(o, i));
+            }
+            return l;
+        } else {
+            return o;
+        }
+    }
+
     @Override
     public void reset() {
         if (rawIn != null) {
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
index 8c0e2ce..3030008 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileTable.java
@@ -42,6 +42,7 @@ import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
 
 import java.lang.reflect.Type;
 import java.math.BigInteger;
@@ -51,7 +52,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-
 public class FlowFileTable extends AbstractTable implements QueryableTable, 
TranslatableTable {
 
     private final RecordReaderFactory recordReaderFactory;
@@ -214,7 +214,8 @@ public class FlowFileTable extends AbstractTable implements 
QueryableTable, Tran
             case STRING:
                 return typeFactory.createJavaType(String.class);
             case ARRAY:
-                return typeFactory.createJavaType(Object[].class);
+                ArrayDataType array = (ArrayDataType) fieldType;
+                return 
typeFactory.createArrayType(getRelDataType(array.getElementType(), 
typeFactory), -1);
             case RECORD:
                 return typeFactory.createJavaType(Record.class);
             case MAP:
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 5b17327..0c26980 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
@@ -213,6 +214,132 @@ public class TestQueryRecord {
     }
 
     @Test
+    public void testCollectionFunctionsWithArray() throws 
InitializationException {
+        final Record record = createHierarchicalArrayRecord();
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(record.getSchema());
+        recordReader.addRecord(record);
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(record.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+                "SELECT title, name" +
+                        "    FROM FLOWFILE" +
+                        "    WHERE CARDINALITY(addresses) > 1");
+
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(1, written.size());
+
+        final Record output = written.get(0);
+        assertEquals("John Doe", output.getValue("name"));
+        assertEquals("Software Engineer", output.getValue("title"));
+    }
+
+    @Test
+    public void testCollectionFunctionsWithWhereClause() throws 
InitializationException {
+        final Record sample = createTaggedRecord("1", "a", "b", "c");
+
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(sample.getSchema());
+        recordReader.addRecord(createTaggedRecord("1", "a", "d", "g"));
+        recordReader.addRecord(createTaggedRecord("2", "b", "e"));
+        recordReader.addRecord(createTaggedRecord("3", "c", "f", "h"));
+
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(sample.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+                "SELECT id, tags FROM FLOWFILE CROSS JOIN 
UNNEST(FLOWFILE.tags) AS f(tag) WHERE tag IN ('a','b')");
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(2, written.size());
+
+        final Record output0 = written.get(0);
+        assertEquals("1", output0.getValue("id"));
+        assertArrayEquals(new Object[]{"a", "d", "g"}, (Object[]) 
output0.getValue("tags"));
+
+        final Record output1 = written.get(1);
+        assertEquals("2", output1.getValue("id"));
+        assertArrayEquals(new Object[]{"b", "e"}, (Object[]) 
output1.getValue("tags"));
+    }
+
+
+    @Test
+    public void testArrayColumnWithIndex() throws InitializationException {
+        final Record sample = createTaggedRecord("1", "a", "b", "c");
+
+        final ArrayListRecordReader recordReader = new 
ArrayListRecordReader(sample.getSchema());
+        recordReader.addRecord(createTaggedRecord("1", "a", "d", "g"));
+        recordReader.addRecord(createTaggedRecord("2", "b", "e"));
+        recordReader.addRecord(createTaggedRecord("3", "c", "f", "h"));
+
+
+        final ArrayListRecordWriter writer = new 
ArrayListRecordWriter(sample.getSchema());
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("reader", recordReader);
+        runner.enableControllerService(recordReader);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+        runner.setProperty(REL_NAME,
+                "SELECT id, tags[1] as first, tags[2] as \"second\", 
tags[CARDINALITY(tags)] as last FROM FLOWFILE");
+        runner.enqueue(new byte[0]);
+
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+
+        final List<Record> written = writer.getRecordsWritten();
+        assertEquals(3, written.size());
+
+        final Record output0 = written.get(0);
+        assertEquals("1", output0.getValue("id"));
+        assertEquals("a", output0.getValue("first"));
+        assertEquals("d", output0.getValue("second"));
+        assertEquals("g", output0.getValue("last"));
+
+        final Record output1 = written.get(1);
+        assertEquals("2", output1.getValue("id"));
+        assertEquals("b", output1.getValue("first"));
+        assertEquals("e", output1.getValue("second"));
+        assertEquals("e", output1.getValue("last"));
+
+        final Record output2 = written.get(2);
+        assertEquals("3", output2.getValue("id"));
+        assertEquals("c", output2.getValue("first"));
+        assertEquals("f", output2.getValue("second"));
+        assertEquals("h", output2.getValue("last"));
+    }
+
+    @Test
     public void testCompareResultsOfTwoRecordPathsAgainstArray() throws 
InitializationException {
         final Record record = createHierarchicalArrayRecord();
 
@@ -382,6 +509,12 @@ public class TestQueryRecord {
      *        "mother": {
      *          "name": "Jane Doe"
      *        }
+     *    },
+     *    "favouriteThings": {
+     *       "sport": "basketball",
+     *       "color": "green",
+     *       "roses": "raindrops",
+     *       "kittens": "whiskers"
      *    }
      * }
      * </pre></code>
@@ -441,6 +574,31 @@ public class TestQueryRecord {
     /**
      * Returns a Record that, if written in JSON, would look like:
      * <code><pre>
+     * {
+     *    "id": &gt;id&lt;,
+     *    "tags": [&gt;tag1&lt;,&gt;tag2&lt;...]
+     * }
+     * </pre></code>
+     *
+     * @return the Record
+     */
+    private Record createTaggedRecord(String id, String...tags) {
+        final List<RecordField> recordSchemaFields = new ArrayList<>();
+        recordSchemaFields.add(new RecordField("id", 
RecordFieldType.STRING.getDataType()));
+        recordSchemaFields.add(new RecordField("tags", 
RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType())));
+        final RecordSchema recordSchema = new 
SimpleRecordSchema(recordSchemaFields);
+
+        final Map<String, Object> map = new HashMap<>();
+        map.put("id", id);
+        map.put("tags", Arrays.asList(tags));
+
+        final Record record = new MapRecord(recordSchema, map);
+        return record;
+    }
+
+    /**
+     * Returns a Record that, if written in JSON, would look like:
+     * <code><pre>
      *          {
      *               "name": "John Doe",
      *               "title": "Software Engineer",
diff --git a/nifi-nar-bundles/nifi-standard-bundle/pom.xml 
b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
index c110d60..f87d71c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/pom.xml
@@ -372,7 +372,7 @@
             <dependency>
                 <groupId>org.apache.calcite</groupId>
                 <artifactId>calcite-core</artifactId>
-                <version>1.17.0</version>
+                <version>1.21.0</version>
             </dependency>
             <dependency>
                 <groupId>org.apache.avro</groupId>

Reply via email to