Repository: nifi
Updated Branches:
  refs/heads/master 878a0b8b7 -> 08189596d


NIFI-5809: If QueryRecord has a single-column projection and that results in a 
null value, do not confuse that with a null value being returned from the 
Record Reader

This closes #3163.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/08189596
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/08189596
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/08189596

Branch: refs/heads/master
Commit: 08189596d27af7bb4518646245549b480d4bb05a
Parents: 878a0b8
Author: Mark Payne <[email protected]>
Authored: Fri Nov 9 11:40:59 2018 -0500
Committer: Koji Kawamura <[email protected]>
Committed: Mon Nov 12 14:39:27 2018 +0900

----------------------------------------------------------------------
 .../nifi/queryrecord/FlowFileEnumerator.java    | 38 +++++++------
 .../processors/standard/TestQueryRecord.java    | 57 ++++++++++++++++----
 2 files changed, 64 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/08189596/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
index 963b85e..5f92311 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/queryrecord/FlowFileEnumerator.java
@@ -17,8 +17,6 @@
 
 package org.apache.nifi.queryrecord;
 
-import java.io.InputStream;
-
 import org.apache.calcite.linq4j.Enumerator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -28,6 +26,8 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 
+import java.io.InputStream;
+
 public class FlowFileEnumerator<InternalType> implements Enumerator<Object> {
     private final ProcessSession session;
     private final FlowFile flowFile;
@@ -57,26 +57,24 @@ public class FlowFileEnumerator<InternalType> implements 
Enumerator<Object> {
     @Override
     public boolean moveNext() {
         currentRow = null;
-        while (currentRow == null) {
-            try {
-                currentRow = filterColumns(recordParser.nextRecord());
-                break;
-            } catch (final Exception e) {
-                throw new ProcessException("Failed to read next record in 
stream for " + flowFile + " due to " + e.getMessage(), e);
-            }
-        }
-
-        if (currentRow == null) {
-            // If we are out of data, close the InputStream. We do this because
-            // Calcite does not necessarily call our close() method.
-            close();
-            try {
-                onFinish();
-            } catch (final Exception e) {
-                logger.error("Failed to perform tasks when enumerator was 
finished", e);
+        try {
+            final Record record = recordParser.nextRecord();
+            if (record == null) {
+                // If we are out of data, close the InputStream. We do this 
because
+                // Calcite does not necessarily call our close() method.
+                close();
+                try {
+                    onFinish();
+                } catch (final Exception e) {
+                    logger.error("Failed to perform tasks when enumerator was 
finished", e);
+                }
+
+                return false;
             }
 
-            return false;
+            currentRow = filterColumns(record);
+        } catch (final Exception e) {
+            throw new ProcessException("Failed to read next record in stream 
for " + flowFile + " due to " + e.getMessage(), e);
         }
 
         recordsRead++;

http://git-wip-us.apache.org/repos/asf/nifi/blob/08189596/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 60fefef..ce710f5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -16,15 +16,6 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.reporting.InitializationException;
@@ -46,6 +37,15 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
 public class TestQueryRecord {
 
     static {
@@ -289,12 +289,47 @@ public class TestQueryRecord {
         runner.run();
 
         runner.assertTransferCount(REL_NAME, 1);
-        final MockFlowFile flowFileOut = 
runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS).get(0);
+        final MockFlowFile flowFileOut = 
runner.getFlowFilesForRelationship(REL_NAME).get(0);
         
flowFileOut.assertContentEquals("\"name\",\"points\"\n\"Tom\",\"100\"\n\"Jerry\",\"2\"\n");
     }
 
     @Test
-    public void testColumnNames() throws InitializationException, IOException {
+    public void testNullValueInSingleField() throws InitializationException, 
IOException {
+        final MockRecordParser parser = new MockRecordParser();
+        parser.addSchemaField("name", RecordFieldType.STRING);
+        parser.addSchemaField("points", RecordFieldType.INT);
+        parser.addRecord("Tom", 1);
+        parser.addRecord("Jerry", null);
+        parser.addRecord("Tom", null);
+        parser.addRecord("Jerry", 3);
+
+        final MockRecordWriter writer = new MockRecordWriter(null, false);
+
+        TestRunner runner = getRunner();
+        runner.addControllerService("parser", parser);
+        runner.enableControllerService(parser);
+        runner.addControllerService("writer", writer);
+        runner.enableControllerService(writer);
+
+        runner.setProperty(REL_NAME, "select points from FLOWFILE");
+        runner.setProperty("count", "select count(*) as c from flowfile where 
points is null");
+        runner.setProperty(QueryRecord.RECORD_READER_FACTORY, "parser");
+        runner.setProperty(QueryRecord.RECORD_WRITER_FACTORY, "writer");
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertTransferCount(REL_NAME, 1);
+        runner.assertTransferCount("count", 1);
+        final MockFlowFile flowFileOut = 
runner.getFlowFilesForRelationship(REL_NAME).get(0);
+        flowFileOut.assertContentEquals("1\n\n\n3\n");
+
+        final MockFlowFile countFlowFile = 
runner.getFlowFilesForRelationship("count").get(0);
+        countFlowFile.assertContentEquals("2\n");
+    }
+
+    @Test
+    public void testColumnNames() throws InitializationException {
         final MockRecordParser parser = new MockRecordParser();
         parser.addSchemaField("name", RecordFieldType.STRING);
         parser.addSchemaField("points", RecordFieldType.INT);

Reply via email to