mattyb149 commented on a change in pull request #3223: NIFI-5903: Allow 
RecordPath to be used in QueryRecord processor. Also…
URL: https://github.com/apache/nifi/pull/3223#discussion_r255659720
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java
 ##########
 @@ -565,4 +588,232 @@ public Connection getConnection() {
             return connection;
         }
     }
+
+
+    // ------------------------------------------------------------
+    // User-Defined Functions for Calcite
+    // ------------------------------------------------------------
+
+
+    public static class ObjectRecordPath extends RecordPathFunction {
+        public Object eval(Object record, String recordPath) {
+            if (record == null) {
+                return null;
+            }
+
+            if (record instanceof Record) {
+                return eval((Record) record, recordPath);
+            }
+            if (record instanceof Record[]) {
+                return eval((Record[]) record, recordPath);
+            }
+
 
 Review comment:
   Do we need to support Map types here too? I tried with the following input 
flow file (it's a one-record JSON array, so one provenance event using the 
default writer that outputs an array):
   
   ```[ {
     "eventId" : "0ffa4354-b103-4598-8750-ba068b81518f",
     "eventOrdinal" : 6,
     "eventType" : "DROP",
     "timestampMillis" : 1549911031378,
     "timestamp" : "2019-02-11T18:50:31.378Z",
     "durationMillis" : 80,
     "lineageStart" : 1549911027454,
     "details" : "Auto-Terminated by original Relationship",
     "componentId" : "dd5d2593-0168-1000-02cb-328065b4b614",
     "componentType" : "QueryRecord",
     "componentName" : "QueryRecord",
     "processGroupId" : "5d7e55f8-0165-1000-1170-d42741815ddb",
     "processGroupName" : "test",
     "entityId" : "532ae9b0-fbd7-437b-af5d-a1c98c3cb806",
     "entityType" : "org.apache.nifi.flowfile.FlowFile",
     "entitySize" : 2926,
     "previousEntitySize" : 2926,
     "updatedAttributes" : { },
     "previousAttributes" : {
       "avro.schema" : 
"{\"type\":\"record\",\"name\":\"nifiRecord\",\"namespace\":\"org.apache.nifi\",\"fields\":[{\"name\":\"zip\",\"type\":[\"null\",\"int\"]},{\"name\":\"thumbnail\",\"type\":[\"null\",\"string\"]},{\"name\":\"last\",\"type\":[\"null\",\"string\"]},{\"name\":\"gender\",\"type\":[\"null\",\"string\"]},{\"name\":\"large\",\"type\":[\"null\",\"string\"]},{\"name\":\"city\",\"type\":[\"null\",\"string\"]},{\"name\":\"date_of_birth\",\"type\":[\"null\",\"string\"]},{\"name\":\"medium\",\"type\":[\"null\",\"string\"]},{\"name\":\"title\",\"type\":[\"null\",\"string\"]},{\"name\":\"cell\",\"type\":[\"null\",\"string\"]},{\"name\":\"version\",\"type\":[\"null\",\"float\"]},{\"name\":\"ssn\",\"type\":[\"null\",\"string\"]},{\"name\":\"reg_date\",\"type\":[\"null\",\"string\"]},{\"name\":\"password\",\"type\":[\"null\",\"string\"]},{\"name\":\"nationality\",\"type\":[\"null\",\"string\"]},{\"name\":\"phone\",\"type\":[\"null\",\"string\"]},{\"name\":\"street\",\"type\":[\"null\",\"string\"]},{\"name\":\"id\",\"type\":[\"null\",\"int\"]},{\"name\":\"state\",\"type\":[\"null\",\"string\"]},{\"name\":\"first\",\"type\":[\"null\",\"string\"]},{\"name\":\"email\",\"type\":[\"null\",\"string\"]},{\"name\":\"username\",\"type\":[\"null\",\"string\"]}]}",
       "file.group" : "staff",
       "file.lastModifiedTime" : "2016-07-08T15:02:06-0400",
       "mime.type" : "application/json",
       "file.permissions" : "rw-r--r--",
       "uuid" : "532ae9b0-fbd7-437b-af5d-a1c98c3cb806",
       "absolute.path" : "/Users/mburgess/datasets/",
       "path" : "/",
       "filename" : "mysql_users_export.csv",
       "record.count" : "5",
       "file.creationTime" : "2016-07-08T15:02:06-0400",
       "file.lastAccessTime" : "2019-02-11T13:50:27-0500",
       "file.owner" : "mburgess"
     },
     "actorHostname" : "192.168.0.7",
     "contentURI" : 
"http://192.168.0.7:8080/nifi-api/provenance-events/6/content/output";,
     "previousContentURI" : 
"http://192.168.0.7:8080/nifi-api/provenance-events/6/content/input";,
     "parentIds" : [ ],
     "childIds" : [ ],
     "platform" : "nifi",
     "application" : "NiFi Flow"
   } ]
   ```
   
   I tried to execute the following query (using both RPATH and RPATH_STRING):
   
   `select * from flowfile where RPATH(previousAttributes, '/file.group') = 
'staff'`
   
   and get the following error:
   
   ```2019-02-11 14:30:42,620 ERROR [Timer-Driven Process Thread-1] 
o.a.nifi.processors.standard.QueryRecord 
QueryRecord[id=b60b157b-1000-1166-2f19-4ab316fba6ce] Unable to query 
StandardFlowFileRecord[uuid=bcdead9d-4516-4e24-9607-f1e84f0ba370,claim=StandardContentClaim
 [resourceClaim=StandardResourceClaim[id=1549912209914-29, container=default, 
section=29], offset=484272, 
length=18883],offset=0,name=1af7cbde-083c-4cfb-bb09-d757211975f0,size=18883] 
due to java.lang.RuntimeException: Cannot evaluate RecordPath /file.group 
against given argument because the argument is of type class java.util.HashMap 
instead of Record: java.lang.RuntimeException: Cannot evaluate RecordPath 
/file.group against given argument because the argument is of type class 
java.util.HashMap instead of Record
   java.lang.RuntimeException: Cannot evaluate RecordPath /file.group against 
given argument because the argument is of type class java.util.HashMap instead 
of Record
        at 
org.apache.nifi.processors.standard.QueryRecord$RecordPathFunction.eval(QueryRecord.java:775)
        at 
org.apache.nifi.processors.standard.QueryRecord$StringRecordPath.eval(QueryRecord.java:658)
   ```
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to