markap14 commented on a change in pull request #3223: NIFI-5903: Allow
RecordPath to be used in QueryRecord processor. Also…
URL: https://github.com/apache/nifi/pull/3223#discussion_r256110244
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
##########
@@ -70,6 +78,398 @@ public TestRunner getRunner() {
return runner;
}
+
+ @Test
+ public void testRecordPathFunctions() throws InitializationException {
+ final Record record = createHierarchicalRecord();
+ final ArrayListRecordReader recordReader = new
ArrayListRecordReader(record.getSchema());
+ recordReader.addRecord(record);
+
+ final ArrayListRecordWriter writer = new
ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT RPATH_STRING(person, '/name') AS name," +
+ " RPATH_INT(person, '/age') AS age," +
+ " RPATH(person, '/name') AS nameObj," +
+ " RPATH(person, '/age') AS ageObj," +
+ " RPATH(person, '/favoriteColors') AS colors," +
+ " RPATH(person, '//name') AS names," +
+ " RPATH_DATE(person, '/dob') AS dob," +
+ " RPATH_LONG(person, '/dobTimestamp') AS dobTimestamp," +
+ " RPATH_DATE(person, 'toDate(/joinTimestamp, \"yyyy-MM-dd\")') AS
joinTime, " +
+ " RPATH_DOUBLE(person, '/weight') AS weight" +
+ " FROM FLOWFILE");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Record output = written.get(0);
+ assertEquals("John Doe", output.getValue("name"));
+ assertEquals("John Doe", output.getValue("nameObj"));
+ assertEquals(30, output.getValue("age"));
+ assertEquals(30, output.getValue("ageObj"));
+ assertArrayEquals(new String[] { "red", "green"}, (Object[])
output.getValue("colors"));
+ assertArrayEquals(new String[] { "John Doe", "Jane Doe"}, (Object[])
output.getValue("names"));
+ assertEquals("1517702400000", output.getAsString("joinTime"));
+ assertEquals(Double.valueOf(180.8D), output.getAsDouble("weight"));
+ }
+
+ @Test
+ public void testRecordPathInAggregate() throws InitializationException {
+ final Record record = createHierarchicalRecord();
+
+ final ArrayListRecordReader recordReader = new
ArrayListRecordReader(record.getSchema());
+ for (int i=0; i < 100; i++) {
+ final Record toAdd = createHierarchicalRecord();
+ final Record person = (Record) toAdd.getValue("person");
+
+ person.setValue("name", "Person " + i);
+ person.setValue("age", i);
+ recordReader.addRecord(toAdd);
+ }
+
+ final ArrayListRecordWriter writer = new
ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT RPATH_STRING(person, '/name') AS name FROM FLOWFILE" +
+ " WHERE RPATH_INT(person, '/age') > (" +
+ " SELECT AVG( RPATH_INT(person, '/age') ) FROM FLOWFILE" +
+ ")");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(50, written.size());
+
+ int i=50;
+ for (final Record writtenRecord : written) {
+ final String name = writtenRecord.getAsString("name");
+ assertEquals("Person " + i, name);
+ i++;
+ }
+ }
+
+
+ @Test
+ public void testRecordPathWithArray() throws InitializationException {
+ final Record record = createHierarchicalArrayRecord();
+ final ArrayListRecordReader recordReader = new
ArrayListRecordReader(record.getSchema());
+ recordReader.addRecord(record);
+
+ final ArrayListRecordWriter writer = new
ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT title, name" +
+ " FROM FLOWFILE" +
+ " WHERE RPATH(addresses, '/state[/label = ''home'']') <>" +
+ " RPATH(addresses, '/state[/label = ''work'']')");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Record output = written.get(0);
+ assertEquals("John Doe", output.getValue("name"));
+ assertEquals("Software Engineer", output.getValue("title"));
+ }
+
+ @Test
+ public void testCompareResultsOfTwoRecordPathsAgainstArray() throws
InitializationException {
+ final Record record = createHierarchicalArrayRecord();
+
+ // Change the value of the 'state' field of both addresses to NY.
+ // This allows us to use an equals operator to ensure that we do get
back the same values,
+ // whereas the unit test above tests <> and that may result in 'false
confidence' if the software
+ // were to provide the wrong values but values that were not equal.
+ Record[] addresses = (Record[]) record.getValue("addresses");
+ for (final Record address : addresses) {
+ address.setValue("state", "NY");
+ }
+
+ final ArrayListRecordReader recordReader = new
ArrayListRecordReader(record.getSchema());
+ recordReader.addRecord(record);
+
+ final ArrayListRecordWriter writer = new
ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT title, name" +
+ " FROM FLOWFILE" +
+ " WHERE RPATH(addresses, '/state[/label = ''home'']') =" +
+ " RPATH(addresses, '/state[/label = ''work'']')");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Record output = written.get(0);
+ assertEquals("John Doe", output.getValue("name"));
+ assertEquals("Software Engineer", output.getValue("title"));
+ }
+
+
+ @Test
+ public void testRecordPathWithArrayAndOnlyOneElementMatchingRPath() throws
InitializationException {
+ final Record record = createHierarchicalArrayRecord();
+ final ArrayListRecordReader recordReader = new
ArrayListRecordReader(record.getSchema());
+ recordReader.addRecord(record);
+
+ final ArrayListRecordWriter writer = new
ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT title, name" +
+ " FROM FLOWFILE" +
+ " WHERE RPATH(addresses, '/state[. = ''NY'']') = 'NY'");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Record output = written.get(0);
+ assertEquals("John Doe", output.getValue("name"));
+ assertEquals("Software Engineer", output.getValue("title"));
+ }
+
+
+ @Test
+ public void testLikeWithRecordPath() throws InitializationException {
+ final Record record = createHierarchicalArrayRecord();
+ final ArrayListRecordReader recordReader = new
ArrayListRecordReader(record.getSchema());
+ recordReader.addRecord(record);
+
+ final ArrayListRecordWriter writer = new
ArrayListRecordWriter(record.getSchema());
+
+ TestRunner runner = getRunner();
+ runner.addControllerService("reader", recordReader);
+ runner.enableControllerService(recordReader);
+ runner.addControllerService("writer", writer);
+ runner.enableControllerService(writer);
+
+ runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "reader");
+ runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+ runner.setProperty(REL_NAME,
+ "SELECT title, name" +
+ " FROM FLOWFILE" +
+ " WHERE RPATH_STRING(addresses, '/state[. = ''NY'']') LIKE
'N%'");
+
+ runner.enqueue(new byte[0]);
+
+ runner.run();
+
+ runner.assertTransferCount(REL_NAME, 1);
+
+ final List<Record> written = writer.getRecordsWritten();
+ assertEquals(1, written.size());
+
+ final Record output = written.get(0);
+ assertEquals("John Doe", output.getValue("name"));
+ assertEquals("Software Engineer", output.getValue("title"));
+ }
+
+
+
+ /**
+ * Returns a Record that, if written in JSON, would look like:
+ * <code><pre>
+ * {
+ * "person": {
+ * "name": "John Doe",
+ * "age": 30,
+ * "favoriteColors": [ "red", "green" },
+ * "dob": 598741575825,
+ * "dobTimestamp": 598741575825,
+ * "joinTimestamp": "2018-02-04 10:20:55.802",
+ * "weight": 180.8,
+ * "mother": {
Review comment:
I updated unit tests to include MAP-related tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services