Repository: nifi
Updated Branches:
  refs/heads/master 7e7f9a5de -> b93cf7bbd


NIFI-3658: Created processor for converting records between data formats with 
like schemas

Signed-off-by: Matt Burgess <[email protected]>

NIFI-3658: Incorporated PR review feedback; added counter; clarified 
documentation

Signed-off-by: Matt Burgess <[email protected]>

This closes #1668


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b93cf7bb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b93cf7bb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b93cf7bb

Branch: refs/heads/master
Commit: b93cf7bbdb3daf400845be6538b4eea66be571cc
Parents: 7e7f9a5
Author: Mark Payne <[email protected]>
Authored: Wed Apr 19 14:57:03 2017 -0400
Committer: Matt Burgess <[email protected]>
Committed: Wed Apr 19 16:12:14 2017 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/ConvertRecord.java | 161 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/standard/TestConvertRecord.java  | 126 +++++++++++++++
 .../standard/util/record/MockRecordWriter.java  |  27 +++-
 .../apache/nifi/serialization/RecordReader.java |  26 +++
 5 files changed, 337 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/b93cf7bb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
new file mode 100644
index 0000000..9a505a2
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.WriteResult;
+
+@EventDriven
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@SideEffectFree
+@Tags({"convert", "generic", "schema", "json", "csv", "avro", "log", "logs", 
"freeform", "text"})
+@WritesAttributes({
+    @WritesAttribute(attribute = "mime.type", description = "Sets the 
mime.type attribute to the MIME Type specified by the Record Writer"),
+    @WritesAttribute(attribute = "record.count", description = "The number of 
records in the FlowFile")
+})
+@CapabilityDescription("Converts records from one data format to another using 
configured Record Reader and Record Write Controller Services. "
+    + "The Reader and Writer must be configured with \"matching\" schemas. By 
this, we mean the schemas must have the same field names. The types of the 
fields "
+    + "do not have to be the same if a field value can be coerced from one 
type to another. For instance, if the input schema has a field named 
\"balance\" of type double, "
+    + "the output schema can have a field named \"balance\" with a type of 
string, double, or float. If any field is present in the input that is not 
present in the output, "
+    + "the field will be left out of the output. If any field is specified in 
the output schema but is not present in the input data/schema, then the field 
will not be "
+    + "present in the output or will have a null value, depending on the 
writer.")
+public class ConvertRecord extends AbstractProcessor {
+
+    static final PropertyDescriptor RECORD_READER = new 
PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for reading 
incoming data")
+        .identifiesControllerService(RowRecordReaderFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new 
PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Controller Service to use for writing out 
the records")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+
+    static final Relationship REL_SUCCESS = new Relationship.Builder()
+        .name("success")
+        .description("FlowFiles that are successfully transformed will be 
routed to this relationship")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If a FlowFile cannot be transformed from the configured 
input format to the configured output format, "
+            + "the unchanged FlowFile will be routed to this relationship")
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RowRecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RowRecordReaderFactory.class);
+        final RecordSetWriterFactory writerFactory = 
context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final RecordSetWriter writer = writerFactory.createWriter(getLogger());
+
+        final AtomicReference<WriteResult> writeResultRef = new 
AtomicReference<>();
+
+        final FlowFile original = flowFile;
+        try {
+            flowFile = session.write(flowFile, new StreamCallback() {
+                @Override
+                public void process(final InputStream in, final OutputStream 
out) throws IOException {
+                    try (final RecordReader reader = 
readerFactory.createRecordReader(original, in, getLogger())) {
+
+                        final WriteResult writeResult = 
writer.write(reader.createRecordSet(), out);
+                        writeResultRef.set(writeResult);
+
+                    } catch (final MalformedRecordException e) {
+                        throw new ProcessException("Could not parse incoming 
data", e);
+                    }
+                }
+            });
+        } catch (final ProcessException e) {
+            getLogger().error("Failed to convert {}", new Object[] {flowFile, 
e});
+            session.transfer(flowFile, REL_FAILURE);
+            return;
+        }
+
+        final WriteResult writeResult = writeResultRef.get();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("record.count", 
String.valueOf(writeResult.getRecordCount()));
+        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+        attributes.putAll(writeResult.getAttributes());
+
+        flowFile = session.putAllAttributes(flowFile, attributes);
+        session.transfer(flowFile, REL_SUCCESS);
+        session.adjustCounter("Records Converted", 
writeResult.getRecordCount(), false);
+        getLogger().info("Successfully converted {} records for {}", new 
Object[] {writeResult.getRecordCount(), flowFile});
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b93cf7bb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 2f2b0cb..3891ee6 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -18,6 +18,7 @@ org.apache.nifi.processors.standard.CompressContent
 org.apache.nifi.processors.standard.ControlRate
 org.apache.nifi.processors.standard.ConvertCharacterSet
 org.apache.nifi.processors.standard.ConvertJSONToSQL
+org.apache.nifi.processors.standard.ConvertRecord
 org.apache.nifi.processors.standard.DebugFlow
 org.apache.nifi.processors.standard.DetectDuplicate
 org.apache.nifi.processors.standard.DistributeLoad

http://git-wip-us.apache.org/repos/asf/nifi/blob/b93cf7bb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
new file mode 100644
index 0000000..0dcaeec
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+import static org.junit.Assert.assertTrue;
+
+import org.apache.nifi.processors.standard.util.record.MockRecordParser;
+import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Test;
+
+public class TestConvertRecord {
+
+    @Test
+    public void testSuccessfulConversion() throws InitializationException {
+        final MockRecordParser readerService = new MockRecordParser();
+        final MockRecordWriter writerService = new MockRecordWriter("header", 
false);
+
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ConvertRecord.RECORD_READER, "reader");
+        runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        readerService.addRecord("John Doe", 48);
+        readerService.addRecord("Jane Doe", 47);
+        readerService.addRecord("Jimmy Doe", 14);
+
+        runner.enqueue("");
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
+
+        out.assertAttributeEquals("record.count", "3");
+        out.assertAttributeEquals("mime.type", "text/plain");
+        out.assertContentEquals("header\nJohn Doe,48\nJane Doe,47\nJimmy 
Doe,14\n");
+    }
+
+
+    @Test
+    public void testReadFailure() throws InitializationException {
+        final MockRecordParser readerService = new MockRecordParser(2);
+        final MockRecordWriter writerService = new MockRecordWriter("header", 
false);
+
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ConvertRecord.RECORD_READER, "reader");
+        runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        readerService.addRecord("John Doe", 48);
+        readerService.addRecord("Jane Doe", 47);
+        readerService.addRecord("Jimmy Doe", 14);
+
+        final MockFlowFile original = runner.enqueue("hello");
+        runner.run();
+
+        // Original FlowFile should be routed to 'failure' relationship 
without modification
+        runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
+        assertTrue(original == out);
+    }
+
+
+    @Test
+    public void testWriteFailure() throws InitializationException {
+        final MockRecordParser readerService = new MockRecordParser();
+        final MockRecordWriter writerService = new MockRecordWriter("header", 
false, 2);
+
+        final TestRunner runner = 
TestRunners.newTestRunner(ConvertRecord.class);
+        runner.addControllerService("reader", readerService);
+        runner.enableControllerService(readerService);
+        runner.addControllerService("writer", writerService);
+        runner.enableControllerService(writerService);
+
+        runner.setProperty(ConvertRecord.RECORD_READER, "reader");
+        runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
+
+        readerService.addSchemaField("name", RecordFieldType.STRING);
+        readerService.addSchemaField("age", RecordFieldType.INT);
+
+        readerService.addRecord("John Doe", 48);
+        readerService.addRecord("Jane Doe", 47);
+        readerService.addRecord("Jimmy Doe", 14);
+
+        final MockFlowFile original = runner.enqueue("hello");
+        runner.run();
+
+        // Original FlowFile should be routed to 'failure' relationship 
without modification
+        runner.assertAllFlowFilesTransferred(ConvertRecord.REL_FAILURE, 1);
+        final MockFlowFile out = 
runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
+        assertTrue(original == out);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/b93cf7bb/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
index 1cf2a28..0a57b29 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordWriter.java
@@ -31,9 +31,21 @@ import org.apache.nifi.serialization.record.RecordSet;
 
 public class MockRecordWriter extends AbstractControllerService implements 
RecordSetWriterFactory {
     private final String header;
+    private final int failAfterN;
+    private final boolean quoteValues;
 
     public MockRecordWriter(final String header) {
+        this(header, true, -1);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues) {
+        this(header, quoteValues, -1);
+    }
+
+    public MockRecordWriter(final String header, final boolean quoteValues, 
final int failAfterN) {
         this.header = header;
+        this.quoteValues = quoteValues;
+        this.failAfterN = failAfterN;
     }
 
     @Override
@@ -48,13 +60,20 @@ public class MockRecordWriter extends 
AbstractControllerService implements Recor
                 final int numCols = rs.getSchema().getFieldCount();
                 Record record = null;
                 while ((record = rs.next()) != null) {
-                    recordCount++;
+                    if (++recordCount > failAfterN && failAfterN > -1) {
+                        throw new IOException("Unit Test intentionally 
throwing IOException after " + failAfterN + " records were written");
+                    }
+
                     int i = 0;
                     for (final String fieldName : 
record.getSchema().getFieldNames()) {
                         final String val = record.getAsString(fieldName);
-                        out.write("\"".getBytes());
-                        out.write(val.getBytes());
-                        out.write("\"".getBytes());
+                        if (quoteValues) {
+                            out.write("\"".getBytes());
+                            out.write(val.getBytes());
+                            out.write("\"".getBytes());
+                        } else {
+                            out.write(val.getBytes());
+                        }
 
                         if (i++ < numCols - 1) {
                             out.write(",".getBytes());

http://git-wip-us.apache.org/repos/asf/nifi/blob/b93cf7bb/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
index b728498..add248e 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReader.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
 
 /**
  * <p>
@@ -51,4 +52,29 @@ public interface RecordReader extends Closeable {
      * @throws MalformedRecordException if an unrecoverable failure occurs 
when trying to parse the underlying data
      */
     RecordSchema getSchema() throws MalformedRecordException;
+
+    /**
+     * @return a RecordSet that returns the records in this Record Reader in a 
streaming fashion
+     */
+    default RecordSet createRecordSet() {
+        return new RecordSet() {
+            @Override
+            public RecordSchema getSchema() throws IOException {
+                try {
+                    return RecordReader.this.getSchema();
+                } catch (final MalformedRecordException mre) {
+                    throw new IOException(mre);
+                }
+            }
+
+            @Override
+            public Record next() throws IOException {
+                try {
+                    return RecordReader.this.nextRecord();
+                } catch (final MalformedRecordException mre) {
+                    throw new IOException(mre);
+                }
+            }
+        };
+    }
 }

Reply via email to