http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-docs/src/main/webapp/WEB-INF/jsp/documentation.jsp ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-docs/src/main/webapp/WEB-INF/jsp/documentation.jsp b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-docs/src/main/webapp/WEB-INF/jsp/documentation.jsp index fd8660a..f3beab4 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-docs/src/main/webapp/WEB-INF/jsp/documentation.jsp +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-docs/src/main/webapp/WEB-INF/jsp/documentation.jsp @@ -61,6 +61,7 @@ <li class="component-item"><a class="document-link getting-started" href="html/getting-started.html" target="component-usage">Getting Started</a></li> <li class="component-item"><a class="document-link user-guide" href="html/user-guide.html" target="component-usage">User Guide</a></li> <li class="component-item"><a class="document-link expression-language-guide" href="html/expression-language-guide.html" target="component-usage">Expression Language Guide</a></li> + <li class="component-item"><a class="document-link record-path-guide" href="html/record-path-guide.html" target="component-usage">RecordPath Guide</a></li> <li class="component-item"><a class="document-link admin-guide" href="html/administration-guide.html" target="component-usage">Admin Guide</a></li> </ul> <span class="no-matching no-components hidden">No matching guides</span>
http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java index ac24e1f..1ccce07 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java @@ -415,7 +415,8 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe try { final RecordSetWriter writer; try { - writer = writerFactory.createWriter(logger, flowFile, new ByteArrayInputStream(records.get(0).value())); + final RecordSchema schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value())); + writer = writerFactory.createWriter(logger, schema); } catch (final Exception e) { logger.error( "Failed to obtain a Record Writer for serializing Kafka messages. This generally happens because the " http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java index f96a575..442ccc5 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java @@ -59,6 +59,7 @@ import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.RecordWriter; +import org.apache.nifi.serialization.record.RecordSchema; @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.10.x"}) @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.10.x Producer API. " @@ -325,7 +326,10 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor { final RecordWriter writer; try (final InputStream in = new BufferedInputStream(session.read(flowFile))) { - writer = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class).createWriter(getLogger(), flowFile, in); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSchema schema = writerFactory.getSchema(flowFile, in); + + writer = writerFactory.createWriter(getLogger(), schema); } catch (final Exception e) { getLogger().error("Failed to create a Record Writer for {}; routing to failure", new Object[] {flowFile, e}); session.transfer(flowFile, REL_FAILURE); http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java index 22d7249..a1abda4 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordWriter.java @@ -25,10 +25,12 @@ import java.util.Collections; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; public class MockRecordWriter extends AbstractControllerService implements RecordSetWriterFactory { @@ -51,7 +53,12 @@ public class MockRecordWriter extends AbstractControllerService implements Recor } @Override - public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) { + public RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException { + return null; + } + + @Override + public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) { return new RecordSetWriter() { @Override public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java index a4f1513..5b4819a 100644 --- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java +++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java @@ -16,6 +16,17 @@ */ package org.apache.nifi.processors.parquet; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -33,6 +44,7 @@ import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -44,18 +56,6 @@ import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.charset.StandardCharsets; -import java.util.HashMap; -import java.util.Map; - -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.when; - public class FetchParquetTest { static final String DIRECTORY = "target"; @@ -221,7 +221,7 @@ public class FetchParquetTest { final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class); when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory"); - when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(FlowFile.class), any(InputStream.class))).thenReturn(recordSetWriter); + when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(RecordSchema.class))).thenReturn(recordSetWriter); testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory); testRunner.enableControllerService(recordSetWriterFactory); http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java index dd0be2b..1dde047 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java @@ -28,6 +28,7 @@ import org.apache.nifi.processors.script.ScriptEngineConfigurator; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.record.RecordSchema; import javax.script.Invocable; import javax.script.ScriptException; @@ -45,16 +46,21 @@ import java.util.HashSet; @Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<RecordSetWriterFactory> implements RecordSetWriterFactory { + @Override @OnEnabled public void onEnabled(final ConfigurationContext context) { super.onEnabled(context); } + public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream in) throws SchemaNotFoundException, IOException { + return createWriter(logger, getSchema(flowFile, in)); + } + @Override - public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream flowFileContent) throws SchemaNotFoundException, IOException { + public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException { if (recordFactory.get() != null) { try { - return recordFactory.get().createWriter(logger, flowFile, flowFileContent); + return recordFactory.get().createWriter(logger, schema); } catch (UndeclaredThrowableException ute) { throw new IOException(ute.getCause()); } @@ -69,6 +75,7 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor * @param scriptBody An input stream associated with the script content * @return Whether the script was successfully reloaded */ + @Override protected boolean reloadScript(final String scriptBody) { // note we are starting here with a fresh listing of validation // results since we are (re)loading a new/updated script. any @@ -142,4 +149,18 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor // return whether there was any issues loading the configured script return results.isEmpty(); } + + @Override + public RecordSchema getSchema(FlowFile flowFile, InputStream in) throws SchemaNotFoundException, IOException { + final RecordSetWriterFactory writerFactory = recordFactory.get(); + if (writerFactory == null) { + return null; + } + + try { + return writerFactory.getSchema(flowFile, in); + } catch (UndeclaredThrowableException ute) { + throw new IOException(ute.getCause()); + } + } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy index d4e7d5a..2e1c03d 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy @@ -103,7 +103,8 @@ class ScriptedRecordSetWriterTest { MockFlowFile mockFlowFile = new MockFlowFile(1L) InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) - RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, mockFlowFile, inStream) + def schema = recordSetWriterFactory.getSchema(mockFlowFile, inStream) + RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, schema) assertNotNull(recordSetWriter) def recordSchema = new SimpleRecordSchema( http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy index e17b701..4fae4fe 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy @@ -16,6 +16,10 @@ */ import groovy.xml.MarkupBuilder + +import java.io.IOException +import java.io.InputStream + import org.apache.nifi.controller.AbstractControllerService import org.apache.nifi.flowfile.FlowFile import org.apache.nifi.logging.ComponentLog @@ -24,6 +28,7 @@ import org.apache.nifi.serialization.RecordSetWriter import org.apache.nifi.serialization.RecordSetWriterFactory import org.apache.nifi.serialization.WriteResult import org.apache.nifi.serialization.record.Record +import org.apache.nifi.serialization.record.RecordSchema import org.apache.nifi.serialization.record.RecordSet import org.apache.nifi.stream.io.NonCloseableOutputStream @@ -73,7 +78,12 @@ class GroovyRecordSetWriter implements RecordSetWriter { class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { @Override - RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream flowFileContent) throws SchemaNotFoundException, IOException { + public RecordSchema getSchema(FlowFile flowFile, InputStream inStream) throws SchemaNotFoundException, IOException { + return null; + } + + @Override + RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) throws SchemaNotFoundException, IOException { return new GroovyRecordSetWriter() } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index 7de1dab..1372735 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -34,6 +34,10 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-path</artifactId> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-ssl-context-service-api</artifactId> </dependency> <dependency> @@ -233,6 +237,17 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-record-serialization-services</artifactId> + <version>1.3.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-schema-registry-service-api</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.derby</groupId> <artifactId>derby</artifactId> <scope>test</scope> @@ -435,6 +450,12 @@ <exclude>src/test/resources/TestExtractGrok/apache.log</exclude> <exclude>src/test/resources/TestExtractGrok/simple_text.log</exclude> <exclude>src/test/resources/TestExtractGrok/patterns</exclude> + <exclude>src/test/resources/TestUpdateRecord/input/person.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json</exclude> + <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude> + <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude> + <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude> <!-- This file is copied from https://github.com/jeremyh/jBCrypt because the binary is compiled for Java 8 and we must support Java 7 --> <exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude> </excludes> http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java new file mode 100644 index 0000000..b6cc83b --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractRecordProcessor.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.StreamCallback; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.RecordSet; + +public abstract class AbstractRecordProcessor extends AbstractProcessor { + + static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() + .name("record-reader") + .displayName("Record Reader") + .description("Specifies the Controller Service to use for reading incoming data") + .identifiesControllerService(RecordReaderFactory.class) + .required(true) + .build(); + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() + .name("record-writer") + .displayName("Record Writer") + .description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class) + .required(true) + .build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("FlowFiles that are successfully transformed will be routed to this relationship") + .build(); + static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, " + + "the unchanged FlowFile will be routed to this relationship") + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(); + properties.add(RECORD_READER); + properties.add(RECORD_WRITER); + return properties; + } + + @Override + public Set<Relationship> getRelationships() { + final Set<Relationship> relationships = new HashSet<>(); + relationships.add(REL_SUCCESS); + relationships.add(REL_FAILURE); + return relationships; + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + FlowFile flowFile = session.get(); + if (flowFile == null) { + return; + } + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + final RecordSetWriter writer; + final RecordSchema writeSchema; + try (final InputStream rawIn = session.read(flowFile); + final InputStream in = new BufferedInputStream(rawIn)) { + writeSchema = writerFactory.getSchema(flowFile, in); + writer = writerFactory.createWriter(getLogger(), writeSchema); + } catch (final Exception e) { + getLogger().error("Failed to convert records for {}; will route to failure", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>(); + + final FlowFile original = flowFile; + try { + flowFile = session.write(flowFile, new StreamCallback() { + @Override + public void process(final InputStream in, final OutputStream out) throws IOException { + try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { + + final RecordSet recordSet = new RecordSet() { + @Override + public RecordSchema getSchema() throws IOException { + try { + return reader.getSchema(); + } catch (final MalformedRecordException e) { + throw new IOException(e); + } catch (final Exception e) { + throw new ProcessException(e); + } + } + + @Override + public Record next() throws IOException { + try { + final Record record = reader.nextRecord(); + if (record == null) { + return null; + } + + return AbstractRecordProcessor.this.process(record, writeSchema, original, context); + } catch (final MalformedRecordException e) { + throw new IOException(e); + } catch (final Exception e) { + throw new ProcessException(e); + } + } + }; + + final WriteResult writeResult = writer.write(recordSet, out); + writeResultRef.set(writeResult); + + } catch (final SchemaNotFoundException | MalformedRecordException e) { + throw new ProcessException("Could not parse incoming data", e); + } + } + }); + } catch (final Exception e) { + getLogger().error("Failed to convert {}", new Object[] {flowFile, e}); + session.transfer(flowFile, REL_FAILURE); + return; + } + + final WriteResult writeResult = writeResultRef.get(); + + final Map<String, String> attributes = new HashMap<>(); + attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + attributes.putAll(writeResult.getAttributes()); + + flowFile = session.putAllAttributes(flowFile, attributes); + session.transfer(flowFile, REL_SUCCESS); + session.adjustCounter("Records Processed", writeResult.getRecordCount(), false); + getLogger().info("Successfully converted {} records for {}", new Object[] {writeResult.getRecordCount(), flowFile}); + } + + protected abstract Record process(Record record, RecordSchema writeSchema, FlowFile flowFile, ProcessContext context); +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java index 2b2caa4..f87339d 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ConvertRecord.java @@ -17,18 +17,6 @@ package org.apache.nifi.processors.standard; -import java.io.BufferedInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; @@ -38,28 +26,16 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.flowfile.attributes.CoreAttributes; -import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.processor.io.StreamCallback; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.MalformedRecordException; -import org.apache.nifi.serialization.RecordReader; -import org.apache.nifi.serialization.RecordReaderFactory; -import org.apache.nifi.serialization.RecordSetWriter; -import org.apache.nifi.serialization.RecordSetWriterFactory; -import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; @EventDriven @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @SideEffectFree -@Tags({"convert", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"}) +@Tags({"convert", "record", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"}) @WritesAttributes({ @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"), @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile") @@ -70,102 +46,11 @@ import org.apache.nifi.serialization.WriteResult; + "the output schema can have a field named \"balance\" with a type of string, double, or float. If any field is present in the input that is not present in the output, " + "the field will be left out of the output. If any field is specified in the output schema but is not present in the input data/schema, then the field will not be " + "present in the output or will have a null value, depending on the writer.") -public class ConvertRecord extends AbstractProcessor { - - static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() - .name("record-reader") - .displayName("Record Reader") - .description("Specifies the Controller Service to use for reading incoming data") - .identifiesControllerService(RecordReaderFactory.class) - .required(true) - .build(); - static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() - .name("record-writer") - .displayName("Record Writer") - .description("Specifies the Controller Service to use for writing out the records") - .identifiesControllerService(RecordSetWriterFactory.class) - .required(true) - .build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("FlowFiles that are successfully transformed will be routed to this relationship") - .build(); - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, " - + "the unchanged FlowFile will be routed to this relationship") - .build(); - - @Override - protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { - final List<PropertyDescriptor> properties = new ArrayList<>(); - properties.add(RECORD_READER); - properties.add(RECORD_WRITER); - return properties; - } - - @Override - public Set<Relationship> getRelationships() { - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - return relationships; - } +public class ConvertRecord extends AbstractRecordProcessor { @Override - public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (flowFile == null) { - return; - } - - final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); - final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSetWriter writer; - try (final InputStream rawIn = session.read(flowFile); - final InputStream in = new BufferedInputStream(rawIn)) { - writer = writerFactory.createWriter(getLogger(), flowFile, in); - } catch (final Exception e) { - getLogger().error("Failed to convert records for {}; will route to failure", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final AtomicReference<WriteResult> writeResultRef = new AtomicReference<>(); - - final FlowFile original = flowFile; - try { - flowFile = session.write(flowFile, new StreamCallback() { - @Override - public void process(final InputStream in, final OutputStream out) throws IOException { - try (final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { - - final WriteResult writeResult = writer.write(reader.createRecordSet(), out); - writeResultRef.set(writeResult); - - } catch (final SchemaNotFoundException | MalformedRecordException e) { - throw new ProcessException("Could not parse incoming data", e); - } - } - }); - } catch (final ProcessException e) { - getLogger().error("Failed to convert {}", new Object[] {flowFile, e}); - session.transfer(flowFile, REL_FAILURE); - return; - } - - final WriteResult writeResult = writeResultRef.get(); - - final Map<String, String> attributes = new HashMap<>(); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.putAll(writeResult.getAttributes()); - - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, REL_SUCCESS); - session.adjustCounter("Records Converted", writeResult.getRecordCount(), false); - getLogger().info("Successfully converted {} records for {}", new Object[] {writeResult.getRecordCount(), flowFile}); + protected Record process(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) { + return record; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java index 5982908..3cb5cce 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/QueryRecord.java @@ -254,7 +254,7 @@ public class QueryRecord extends AbstractProcessor { final RecordSetWriter resultSetWriter; try (final InputStream rawIn = session.read(original); final InputStream in = new BufferedInputStream(rawIn)) { - resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), original, in); + resultSetWriter = resultSetWriterFactory.createWriter(getLogger(), resultSetWriterFactory.getSchema(original, in)); } for (final PropertyDescriptor descriptor : context.getProperties().keySet()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java index 05aa98c..62ca521 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java @@ -136,7 +136,7 @@ public class SplitRecord extends AbstractProcessor { final RecordSetWriter writer; try (final InputStream rawIn = session.read(original); final InputStream in = new BufferedInputStream(rawIn)) { - writer = writerFactory.createWriter(getLogger(), original, in); + writer = writerFactory.createWriter(getLogger(), writerFactory.getSchema(original, in)); } catch (final Exception e) { getLogger().error("Failed to create Record Writer for {}; routing to failure", new Object[] {original, e}); session.transfer(original, REL_FAILURE); http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java new file mode 100644 index 0000000..9151cde --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.nifi.annotation.behavior.EventDriven; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.SupportsBatching; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.SeeAlso; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.record.path.FieldValue; +import org.apache.nifi.record.path.RecordPath; +import org.apache.nifi.record.path.RecordPathResult; +import org.apache.nifi.record.path.util.RecordPathCache; +import org.apache.nifi.record.path.validation.RecordPathPropertyNameValidator; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordSchema; + + +@EventDriven +@SideEffectFree +@SupportsBatching +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"update", "record", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"}) +@CapabilityDescription("Updates the contents of a FlowFile that contains Record-oriented data (i.e., data that can be read via a RecordReader and written by a RecordWriter). " + + "This Processor requires that at least one user-defined Property be added. The name of the Property should indicate a RecordPath that determines the field that should " + + "be updated. The value of the Property is either a replacement value (optionally making use of the Expression Language) or is itself a RecordPath that extracts a value from " + + "the Record. Whether the Property value is determined to be a RecordPath or a literal value depends on the configuration of the <Replacement Value Strategy> Property.") +@SeeAlso({ConvertRecord.class}) +public class UpdateRecord extends AbstractRecordProcessor { + + private volatile RecordPathCache recordPathCache; + private volatile List<String> recordPaths; + + static final AllowableValue LITERAL_VALUES = new AllowableValue("literal-value", "Literal Value", + "The value entered for a Property (after Expression Language has been evaluated) is the desired value to update the Record Fields with."); + static final AllowableValue RECORD_PATH_VALUES = new AllowableValue("record-path-value", "Record Path Value", + "The value entered for a Property (after Expression Language has been evaluated) is not the literal value to use but rather is a Record Path " + + "that should be evaluated against the Record, and the result of the RecordPath will be used to update the Record. Note that if this option is selected, " + + "and the Record Path results in multiple values for a given Record, the input FlowFile will be routed to the 'failure' Relationship."); + + static final PropertyDescriptor REPLACEMENT_VALUE_STRATEGY = new PropertyDescriptor.Builder() + .name("replacement-value-strategy") + .displayName("Replacement Value Strategy") + .description("Specifies how to interpret the configured replacement values") + .allowableValues(LITERAL_VALUES, RECORD_PATH_VALUES) + .defaultValue(LITERAL_VALUES.getValue()) + .expressionLanguageSupported(false) + .required(true) + .build(); + + @Override + protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { + final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors()); + properties.add(REPLACEMENT_VALUE_STRATEGY); + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("Specifies the value to use to replace fields in the record that match the RecordPath: " + propertyDescriptorName) + .required(false) + .dynamic(true) + .expressionLanguageSupported(true) + .addValidator(new RecordPathPropertyNameValidator()) + .build(); + } + + @Override + protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { + final boolean containsDynamic = validationContext.getProperties().keySet().stream() + .anyMatch(property -> property.isDynamic()); + + if (containsDynamic) { + return Collections.emptyList(); + } + + return Collections.singleton(new ValidationResult.Builder() + .subject("User-defined Properties") + .valid(false) + .explanation("At least one RecordPath must be specified") + .build()); + } + + @OnScheduled + public void createRecordPaths(final ProcessContext context) { + recordPathCache = new RecordPathCache(context.getProperties().size() * 2); + + final List<String> recordPaths = new ArrayList<>(context.getProperties().size() - 2); + for (final PropertyDescriptor property : context.getProperties().keySet()) { + if (property.isDynamic()) { + recordPaths.add(property.getName()); + } + } + + this.recordPaths = recordPaths; + } + + @Override + protected Record process(final Record record, final RecordSchema writeSchema, final FlowFile flowFile, final ProcessContext context) { + final boolean evaluateValueAsRecordPath = context.getProperty(REPLACEMENT_VALUE_STRATEGY).getValue().equals(RECORD_PATH_VALUES.getValue()); + + // Incorporate the RecordSchema that we will use for writing records into the Schema that we have + // for the record, because it's possible that the updates to the record will not be valid otherwise. + record.incorporateSchema(writeSchema); + + for (final String recordPathText : recordPaths) { + final RecordPath recordPath = recordPathCache.getCompiled(recordPathText); + final RecordPathResult result = recordPath.evaluate(record); + + final String replacementValue = context.getProperty(recordPathText).evaluateAttributeExpressions(flowFile).getValue(); + if (evaluateValueAsRecordPath) { + final RecordPath replacementRecordPath = recordPathCache.getCompiled(replacementValue); + + // If we have an Absolute RecordPath, we need to evaluate the RecordPath only once against the Record. + // If the RecordPath is a Relative Path, then we have to evaluate it against each FieldValue. + if (replacementRecordPath.isAbsolute()) { + processAbsolutePath(replacementRecordPath, result.getSelectedFields(), record, replacementValue); + } else { + processRelativePath(replacementRecordPath, result.getSelectedFields(), record, replacementValue); + } + } else { + result.getSelectedFields().forEach(fieldVal -> fieldVal.updateValue(replacementValue)); + } + } + + return record; + } + + private void processAbsolutePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record, final String replacementValue) { + final RecordPathResult replacementResult = replacementRecordPath.evaluate(record); + final Object replacementObject = getReplacementObject(replacementResult, replacementValue); + destinationFields.forEach(fieldVal -> fieldVal.updateValue(replacementObject)); + } + + private void processRelativePath(final RecordPath replacementRecordPath, final Stream<FieldValue> destinationFields, final Record record, final String replacementValue) { + destinationFields.forEach(fieldVal -> { + final RecordPathResult replacementResult = replacementRecordPath.evaluate(fieldVal); + final Object replacementObject = getReplacementObject(replacementResult, replacementValue); + fieldVal.updateValue(replacementObject); + }); + } + + private Object getReplacementObject(final RecordPathResult recordPathResult, final String replacementValue) { + final List<FieldValue> selectedFields = recordPathResult.getSelectedFields().collect(Collectors.toList()); + + if (selectedFields.size() > 1) { + throw new ProcessException("Cannot update Record because the Replacement Record Path \"" + replacementValue + "\" yielded " + + selectedFields.size() + " results but this Processor only supports a single result."); + } + + if (selectedFields.isEmpty()) { + return null; + } else { + return selectedFields.get(0).getValue(); + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index b4085c8..221fe0a 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -102,3 +102,4 @@ org.apache.nifi.processors.standard.FetchDistributedMapCache org.apache.nifi.processors.standard.ListFTP org.apache.nifi.processors.standard.FetchFTP org.apache.nifi.processors.standard.UpdateCounter +org.apache.nifi.processors.standard.UpdateRecord \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html index 93bbe2a..d289069 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.QueryRecord/additionalDetails.html @@ -18,7 +18,7 @@ <meta charset="utf-8" /> <title>QueryRecord</title> - <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /> + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> </head> <body> http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html new file mode 100644 index 0000000..c20f48a --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.UpdateRecord/additionalDetails.html @@ -0,0 +1,332 @@ +<!DOCTYPE html> +<html lang="en"> + <!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + <head> + <meta charset="utf-8" /> + <title>UpdateRecord</title> + + <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" /> + </head> + + <body> + <p> + UpdateRecord makes use of the NiFi <a href="/nifi-docs/html/record-path-guide.html"> + RecordPath Domain-Specific Language (DSL)</a> to allow the user + to indicate which field(s) in the Record should be updated. Users do this by adding a User-defined + Property to the Processor's configuration. The name of the User-defined Property must be the + RecordPath text that should be evaluated against each Record. The value of the Property specifies + what value should go into that selected Record field. + </p> + + <p> + When specifying the replacement value (the value of the User-defined Property), the user is able + to specify a literal value such as the number <code>10</code>; an Expression Language Expression + to reference FlowFile attributes, such as <code>${filename}</code>; or another RecordPath path + from which to retrieve the desired value from the Record itself. Whether the value entered should + be interpreted as a literal or a RecordPath path is determined by the value of the <Replacement + Value Strategy> Property. + </p> + + <p> + If a RecordPath is given and does not match any field in an input Record, that Property will be skipped + and all other Properties will still be evaluated. If the RecordPath matches exactly one field, that field + will be updated with the corresponding value. If multiple fields match the RecordPath, then all fields that + match will be updated. If the replacement value is itself a RecordPath that does not match, then a + <code>null</code> value will be set for the field. For instances where this is not the desired behavior, + RecordPath predicates can be used to filter the fields that match so that no fields will be selected. + See <a href="/nifi-docs/html/record-path-guide.html#predicates">RecordPath Predicates</a> for more information. + </p> + + <p> + Below, we lay out some examples in order to provide clarity about the Processor's behavior. For all of + the examples below, consider the example to operate on the following set of 2 (JSON) records: + </p> + +<code> +<pre> + [{ + "id": 17, + "name": "John", + "child": { + "id": "1" + }, + "siblingIds": [4, 8], + "siblings": [ + { "name": "Jeremy", "id": 4 }, + { "name": "Julia", "id": 8 } + ] + }, + { + "id": 98, + "name": "Jane", + "child": { + "id": 2 + }, + "gender": "F", + "siblingIds": [], + "siblings": [] + }] +</pre> +</code> + + <p> + For brevity, we will omit the corresponding schema and configuration of the RecordReader and + RecordWriter. Otherwise, consider the following set of Properties are configured for the Processor + and their associated outputs. + </p> + + <h3>Example 1 - Replace with Literal</h3> + + <p> + Here, we will replace the name of each Record with the name 'Jeremy' and set the gender to 'M': + </p> + + <table> + <tr> + <th>Property Name</th> + <th>Property Value</th> + </tr> + <tr> + <td>Replacement Value Strategy</td> + <td>Literal Value</td> + </tr> + <tr> + <td>/name</td> + <td>Jeremy</td> + </tr> + <tr> + <td>/gender</td> + <td>M</td> + </tr> + </table> + + <p> + This will yield the following output: + </p> + +<code> +<pre> + [{ + "id": 17, + "name": "Jeremy", + "child": { + "id": "1" + }, + "gender": "M", + "siblingIds": [4, 8], + "siblings": [ + { "name": "Jeremy", "id": 4 }, + { "name": "Julia", "id": 8 } + ] + }, + { + "id": 98, + "name": "Jeremy", + "child": { + "id": 2 + }, + "gender": "M", + "siblingIds": [], + "siblings": [] + }] +</pre> +</code> + + <p> + Note that even though the first record did not have a "gender" field in the input, one + will be added after the "child" field, as that's where the field is located in the schema. + </p> + + + <h3>Example 2 - Replace with RecordPath</h3> + + <p> + This example will replace the value in one field of the Record with the value from another field. + For this example, consider the following set of Properties: + </p> + + <table> + <tr> + <th>Property Name</th> + <th>Property Value</th> + </tr> + <tr> + <td>Replacement Value Strategy</td> + <td>Record Path Value</td> + </tr> + <tr> + <td>/name</td> + <td>/siblings[0]/name</td> + </tr> + </table> + + <p> + This will yield the following output: + </p> + +<code> +<pre> + [{ + "id": 17, + "name": "Jeremy", + "child": { + "id": "1" + }, + "siblingIds": [4, 8], + "siblings": [ + { "name": "Jeremy", "id": 4 }, + { "name": "Julia", "id": 8 } + ] + }, + { + "id": 98, + "name": null, + "child": { + "id": 2 + }, + "gender": "F", + "siblingIds": [], + "siblings": [] + }] +</pre> +</code> + + + + <h3>Example 3 - Replace with Relative RecordPath</h3> + + <p> + In the above example, we replaced the value of field based on another RecordPath. That RecordPath was an "absolute RecordPath," + meaning that it starts with a "slash" character (<code>/</code>) and therefore it specifies the path from the "root" or "outer most" element. + However, sometimes we want to reference a field in such a way that we defined the RecordPath relative to the field being updated. This example + does just that. For each of the siblings given in the "siblings" array, we will replace the sibling's name with their id's. To do so, we will + configure the processor with the following properties: + </p> + + <table> + <tr> + <th>Property Name</th> + <th>Property Value</th> + </tr> + <tr> + <td>Replacement Value Strategy</td> + <td>Record Path Value</td> + </tr> + <tr> + <td>/siblings[*]/name</td> + <td>../id</td> + </tr> + </table> + + <p> + Note that the RecordPath that was given for the value starts with <code>..</code>, which is a reference to the parent. We do this because the field + that we are going to update is the "name" field of the sibling. To get to the associated "id" field, we need to go to the "name" field's parent and then + to its "id" child field. The above example results in the following output: + </p> + +<code> +<pre> + [{ + "id": 17, + "name": "John", + "child": { + "id": "1" + }, + "siblingIds": [4, 8], + "siblings": [ + { "name": "4", "id": 4 }, + { "name": "8", "id": 8 } + ] + }, + { + "id": 98, + "name": "Jane", + "child": { + "id": 2 + }, + "gender": "F", + "siblingIds": [], + "siblings": [] + }] +</pre> +</code> + + + + + <h3>Example 4 - Replace Multiple Values</h3> + + <p> + This example will replace the value of all fields that have the name "id", regardless of + where in the Record hierarchy the field is found. The value that it uses references the Expression Language, + so for this example, let's assume that the incoming FlowFile has an attribute named "replacement.id" that + has a value of "91": + </p> + + <table> + <tr> + <th>Property Name</th> + <th>Property Value</th> + </tr> + <tr> + <td>Replacement Value Strategy</td> + <td>Literal Value</td> + </tr> + <tr> + <td>//id</td> + <td>${replacement.id}</td> + </tr> + </table> + + <p> + This will yield the following output: + </p> + +<code> +<pre> + [{ + "id": 91, + "name": "John", + "child": { + "id": "91" + }, + "siblingIds": [4, 8], + "siblings": [ + { "name": "Jeremy", "id": 91 }, + { "name": "Julia", "id": 91 } + ] + }, + { + "id": 91, + "name": "Jane", + "child": { + "id": 91 + }, + "gender": "F", + "siblingIds": [], + "siblings": [] + }] +</pre> +</code> + + <p> + It is also worth noting that in this example, some of the "id" fields were of type STRING, while + others were of type INT. This is okay because the RecordReaders and RecordWriters should handle + these simple type coercions for us. + </p> + + </body> +</html> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java index 70d3e87..023fba7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java @@ -20,13 +20,17 @@ import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.MockRecordParser; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.RecordSet; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -41,6 +45,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; public class TestQueryRecord { @@ -256,7 +261,15 @@ public class TestQueryRecord { } @Override - public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream in) { + public RecordSchema getSchema(FlowFile flowFile, InputStream content) throws SchemaNotFoundException, IOException { + final List<RecordField> recordFields = columnNames.stream() + .map(name -> new RecordField(name, RecordFieldType.STRING.getDataType())) + .collect(Collectors.toList()); + return new SimpleRecordSchema(recordFields); + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema) { return new RecordSetWriter() { @Override public WriteResult write(final RecordSet rs, final OutputStream out) throws IOException { http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java new file mode 100644 index 0000000..6d88e57 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestUpdateRecord.java @@ -0,0 +1,217 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.standard; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collections; + +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.reporting.InitializationException; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.serialization.record.MockRecordParser; +import org.apache.nifi.serialization.record.MockRecordWriter; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +public class TestUpdateRecord { + + private TestRunner runner; + private MockRecordParser readerService; + private MockRecordWriter writerService; + + @Before + public void setup() throws InitializationException { + readerService = new MockRecordParser(); + writerService = new MockRecordWriter("header", false); + + runner = TestRunners.newTestRunner(UpdateRecord.class); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + runner.addControllerService("writer", writerService); + runner.enableControllerService(writerService); + + runner.setProperty(UpdateRecord.RECORD_READER, "reader"); + runner.setProperty(UpdateRecord.RECORD_WRITER, "writer"); + + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("age", RecordFieldType.INT); + } + + + @Test + public void testLiteralReplacementValue() { + runner.setProperty("/name", "Jane Doe"); + runner.enqueue(""); + + readerService.addRecord("John Doe", 35); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0); + out.assertContentEquals("header\nJane Doe,35\n"); + } + + @Test + public void testLiteralReplacementValueExpressionLanguage() { + runner.setProperty("/name", "${newName}"); + runner.enqueue("", Collections.singletonMap("newName", "Jane Doe")); + + readerService.addRecord("John Doe", 35); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0); + out.assertContentEquals("header\nJane Doe,35\n"); + } + + @Test + public void testRecordPathReplacementValue() { + runner.setProperty("/name", "/age"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES.getValue()); + runner.enqueue(""); + + readerService.addRecord("John Doe", 35); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final MockFlowFile out = runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0); + out.assertContentEquals("header\n35,35\n"); + } + + @Test + public void testInvalidRecordPathUsingExpressionLanguage() { + runner.setProperty("/name", "${recordPath}"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES.getValue()); + runner.enqueue("", Collections.singletonMap("recordPath", "hello")); + + readerService.addRecord("John Doe", 35); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_FAILURE, 1); + } + + @Test + public void testReplaceWithMissingRecordPath() throws InitializationException { + readerService = new MockRecordParser(); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("siblings", RecordFieldType.ARRAY); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + + runner.setProperty("/name", "/siblings[0]/name"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES.getValue()); + + runner.enqueue("", Collections.singletonMap("recordPath", "hello")); + + readerService.addRecord("John Doe", null); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0); + mff.assertContentEquals("header\n,\n"); + } + + @Test + public void testRelativePath() throws InitializationException { + readerService = new MockRecordParser(); + readerService.addSchemaField("name", RecordFieldType.STRING); + readerService.addSchemaField("nickname", RecordFieldType.STRING); + runner.addControllerService("reader", readerService); + runner.enableControllerService(readerService); + + runner.setProperty("/name", "../nickname"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES.getValue()); + + runner.enqueue("", Collections.singletonMap("recordPath", "hello")); + + readerService.addRecord("John Doe", "Johnny"); + runner.run(); + + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final MockFlowFile mff = runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0); + mff.assertContentEquals("header\nJohnny,Johnny\n"); + } + + @Test + public void testChangingSchema() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json")); + runner.setProperty("/name", "/name/first"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-firstname.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } + + @Test + public void testAddFieldNotInInputRecord() throws InitializationException, IOException { + final JsonTreeReader jsonReader = new JsonTreeReader(); + runner.addControllerService("reader", jsonReader); + + final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc"))); + final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc"))); + + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText); + runner.enableControllerService(jsonReader); + + final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter(); + runner.addControllerService("writer", jsonWriter); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY); + runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText); + runner.setProperty(jsonWriter, "Pretty Print JSON", "true"); + runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute"); + runner.enableControllerService(jsonWriter); + + runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json")); + runner.setProperty("/firstName", "/name/first"); + runner.setProperty("/lastName", "/name/last"); + runner.setProperty(UpdateRecord.REPLACEMENT_VALUE_STRATEGY, UpdateRecord.RECORD_PATH_VALUES); + + runner.run(); + runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1); + final String expectedOutput = new String(Files.readAllBytes(Paths.get("src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json"))); + runner.getFlowFilesForRelationship(UpdateRecord.REL_SUCCESS).get(0).assertContentEquals(expectedOutput); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person.json new file mode 100644 index 0000000..7538381 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/input/person.json @@ -0,0 +1,7 @@ +{ + "id": 485, + "name": { + "last": "Doe", + "first": "John" + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json new file mode 100644 index 0000000..8e6dea7 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-firstname-lastname.json @@ -0,0 +1,5 @@ +[ { + "id" : 485, + "firstName" : "John", + "lastName" : "Doe" +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-firstname.json ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-firstname.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-firstname.json new file mode 100644 index 0000000..b91adc5 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/output/person-with-firstname.json @@ -0,0 +1,4 @@ +[ { + "id" : 485, + "name" : "John" +} ] \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc new file mode 100644 index 0000000..baa6c05 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc @@ -0,0 +1,16 @@ +{ + "name": "personWithNameRecord", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": { + "type": "record", + "name": "nameRecord", + "fields": [ + { "name": "last", "type": "string" }, + { "name": "first", "type": "string" } + ] + }} + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc new file mode 100644 index 0000000..eb6c644 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc @@ -0,0 +1,10 @@ +{ + "name": "personWithNameRecord", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "firstName", "type": "string" }, + { "name": "lastName", "type": "string" } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc new file mode 100644 index 0000000..2fdfd1e --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc @@ -0,0 +1,9 @@ +{ + "name": "personWithNameRecord", + "namespace": "nifi", + "type": "record", + "fields": [ + { "name": "id", "type": "int" }, + { "name": "name", "type": "string" } + ] +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/b1901d5f/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/.gitignore ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/.gitignore b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-lookup-services-bundle/nifi-lookup-services/.gitignore @@ -0,0 +1 @@ +/bin/
