Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 8201381c9 -> 5eed33580


NIFI-551 - ConvertJSONToAvro improve error message

- Report failure counts as an log error message
- Send record parsing errors to a separate flowfile which is transfered down the
  failure relationship

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6f32e6e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6f32e6e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6f32e6e9

Branch: refs/heads/develop
Commit: 6f32e6e9776548b746f6388d25067cd17aa1898c
Parents: 8201381
Author: ricky <[email protected]>
Authored: Tue Apr 28 15:58:09 2015 -0400
Committer: joewitt <[email protected]>
Committed: Thu Jun 4 14:52:16 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/kite/ConvertJSONToAvro.java | 38 ++++++++++++++------
 .../kite/TestJSONToAvroProcessor.java           | 23 ++++++++++--
 2 files changed, 47 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f32e6e9/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
index 78f80b9..d4cc760 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -38,6 +38,7 @@ import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.kitesdk.data.DatasetException;
 import org.kitesdk.data.DatasetIOException;
@@ -97,22 +98,22 @@ public class ConvertJSONToAvro extends 
AbstractKiteProcessor {
     }
 
     @Override
-    public void onTrigger(ProcessContext context, final ProcessSession session)
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session)
             throws ProcessException {
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
+        FlowFile successfulRecords = session.get();
+        if (successfulRecords == null) {
             return;
         }
 
         String schemaProperty = context.getProperty(SCHEMA)
-                .evaluateAttributeExpressions(flowFile)
+                .evaluateAttributeExpressions(successfulRecords)
                 .getValue();
         final Schema schema;
         try {
             schema = getSchema(schemaProperty, DefaultConfiguration.get());
         } catch (SchemaNotFoundException e) {
             getLogger().error("Cannot find schema: " + schemaProperty);
-            session.transfer(flowFile, FAILURE);
+            session.transfer(successfulRecords, FAILURE);
             return;
         }
 
@@ -121,21 +122,31 @@ public class ConvertJSONToAvro extends 
AbstractKiteProcessor {
         writer.setCodec(CodecFactory.snappyCodec());
 
         try {
-            flowFile = session.write(flowFile, new StreamCallback() {
+          successfulRecords = session.write(successfulRecords, new 
StreamCallback() {
                 @Override
                 public void process(InputStream in, OutputStream out) throws 
IOException {
+                    FlowFile failedRecords = session.create();
                     long written = 0L;
                     long errors = 0L;
+                    long total = 0L;
                     try (JSONFileReader<Record> reader = new JSONFileReader<>(
                             in, schema, Record.class)) {
                         reader.initialize();
                         try (DataFileWriter<Record> w = writer.create(schema, 
out)) {
                             while (reader.hasNext()) {
+                                total += 1;
                                 try {
                                     Record record = reader.next();
                                     w.append(record);
                                     written += 1;
-                                } catch (DatasetRecordException e) {
+                                } catch (final DatasetRecordException e) {
+                                    failedRecords = 
session.append(failedRecords, new OutputStreamCallback() {
+                                        @Override
+                                        public void process(OutputStream out) 
throws IOException {
+                                            out.write((e.getMessage() + " [" +
+                                              e.getCause().getMessage() + 
"]\n").getBytes());
+                                        }
+                                    });
                                     errors += 1;
                                 }
                             }
@@ -143,21 +154,26 @@ public class ConvertJSONToAvro extends 
AbstractKiteProcessor {
                         session.adjustCounter("Converted records", written,
                                 false /* update only if file transfer is 
successful */);
                         session.adjustCounter("Conversion errors", errors,
-                                false /* update only if file transfer is 
successful */);
+                          false /* update only if file transfer is successful 
*/);
+
+                        if (errors > 0L) {
+                            getLogger().warn("Failed to convert " + errors + 
'/' + total + " records from JSON to Avro");
+                        }
                     }
+                    session.transfer(failedRecords, FAILURE);
                 }
             });
 
-            session.transfer(flowFile, SUCCESS);
+            session.transfer(successfulRecords, SUCCESS);
 
             //session.getProvenanceReporter().send(flowFile, 
target.getUri().toString());
         } catch (ProcessException | DatasetIOException e) {
             getLogger().error("Failed reading or writing", e);
-            session.transfer(flowFile, FAILURE);
+            session.transfer(successfulRecords, FAILURE);
 
         } catch (DatasetException e) {
             getLogger().error("Failed to read FlowFile", e);
-            session.transfer(flowFile, FAILURE);
+            session.transfer(successfulRecords, FAILURE);
 
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6f32e6e9/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
index d50e7f9..0b53bc7 100644
--- 
a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
+++ 
b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
@@ -18,9 +18,17 @@
  */
 package org.apache.nifi.processors.kite;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Path;
+import java.util.List;
+
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
@@ -38,9 +46,13 @@ public class TestJSONToAvroProcessor {
 
     public static final String JSON_CONTENT = ""
             + "{\"id\": 1,\"color\": \"green\"}"
-            + "{\"id\": \"120V\", \"color\": \"blue\"}\n" + // invalid, ID is 
a string
+            + "{\"id\": \"120V\", \"color\": \"blue\"}\n" // invalid, ID is a 
string
+            + "{\"id\": 10, \"color\": 15.23}\n" + // invalid, color as double
             "{\"id\": 2, \"color\": \"grey\", \"price\": 12.95 }";
 
+    public static final String FAILURE_CONTENT = "Cannot convert field id 
[Cannot convert to long: \"120V\"]\n" +
+      "Cannot convert field color [Cannot convert to string: 15.23]\n";
+
     @Test
     public void testBasicConversion() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
@@ -54,8 +66,13 @@ public class TestJSONToAvroProcessor {
         long converted = runner.getCounterValue("Converted records");
         long errors = runner.getCounterValue("Conversion errors");
         Assert.assertEquals("Should convert 2 rows", 2, converted);
-        Assert.assertEquals("Should reject 1 row", 1, errors);
+        Assert.assertEquals("Should reject 2 rows", 2, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 1);
 
-        runner.assertAllFlowFilesTransferred("success", 1);
+        String failureContent = Bytes.toString(runner.getContentAsByteArray(
+          runner.getFlowFilesForRelationship("failure").get(0)));
+        Assert.assertEquals("Should reject an invalid string and double", 
FAILURE_CONTENT, failureContent);
     }
 }

Reply via email to