NIFI-810: Merged master into branch
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0636f0e7 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0636f0e7 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0636f0e7 Branch: refs/heads/NIFI-655 Commit: 0636f0e731cd28299edd3a6e9db90de5045ab662 Parents: 8e2308b d63cd6b Author: Mark Payne <[email protected]> Authored: Sun Oct 25 11:02:40 2015 -0400 Committer: Mark Payne <[email protected]> Committed: Sun Oct 25 11:02:40 2015 -0400 ---------------------------------------------------------------------- .../src/main/asciidoc/administration-guide.adoc | 4 +- .../src/main/java/MyProcessor.java | 11 +- .../nifi/processors/avro/ConvertAvroToJSON.java | 67 ++++- .../processors/avro/TestConvertAvroToJSON.java | 47 ++- .../processors/aws/AbstractAWSProcessor.java | 2 +- .../nifi/processors/aws/s3/DeleteS3Object.java | 98 ++++++ .../org.apache.nifi.processor.Processor | 1 + .../processors/aws/s3/TestDeleteS3Object.java | 141 +++++++++ .../nifi/controller/FlowUnmarshaller.java | 77 ----- .../src/main/resources/FlowConfiguration.xsd | 2 +- .../src/main/resources/bin/nifi.sh | 96 +++--- .../canvas/new-controller-service-dialog.jsp | 1 - .../partials/canvas/new-processor-dialog.jsp | 1 - .../canvas/new-reporting-task-dialog.jsp | 1 - .../css/new-controller-service-dialog.css | 9 - .../main/webapp/css/new-processor-dialog.css | 9 - .../webapp/css/new-reporting-task-dialog.css | 9 - .../webapp/js/nf/canvas/nf-canvas-toolbox.js | 60 ++-- .../src/main/webapp/js/nf/canvas/nf-settings.js | 140 +++++---- .../processors/kite/AbstractKiteProcessor.java | 11 +- .../nifi/processors/kite/ConvertCSVToAvro.java | 296 ++++++++++--------- .../processors/kite/TestCSVToAvroProcessor.java | 39 +++ .../nifi-standard-prioritizers/pom.xml | 4 + .../PriorityAttributePrioritizer.java | 7 +- .../PriorityAttributePrioritizerTest.java | 17 +- .../nifi-standard-processors/pom.xml | 9 + .../nifi/processors/standard/ExecuteSQL.java | 9 +- .../nifi/processors/standard/InvokeHTTP.java | 1 + .../nifi/processors/standard/ListenHTTP.java | 105 ++++--- .../standard/PutDistributedMapCache.java | 252 ++++++++++++++++ .../servlets/ContentAcknowledgmentServlet.java | 3 +- .../standard/servlets/ListenHTTPServlet.java | 8 +- .../processors/standard/util/JdbcCommon.java | 70 ++++- .../org.apache.nifi.processor.Processor | 1 + .../nifi/processors/standard/TestGetFile.java | 21 +- .../standard/TestPutDistributedMapCache.java | 277 +++++++++++++++++ .../standard/util/TestJdbcCommon.java | 42 +++ .../standard/util/TestJdbcTypesDerby.java | 133 +++++++++ .../standard/util/TestJdbcTypesH2.java | 149 ++++++++++ pom.xml | 2 +- 40 files changed, 1725 insertions(+), 507 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java index b214427,f0ba71a..f0f1630 --- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java +++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java @@@ -35,7 -36,8 +38,7 @@@ import org.apache.nifi.annotation.behav import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.documentation.CapabilityDescription; --import org.apache.nifi.annotation.documentation.Tags; + import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.AbstractProcessor; @@@ -47,8 -50,7 +51,7 @@@ import org.apache.nifi.processor.io.Str @SideEffectFree @SupportsBatching - @Tags({ "json", "avro", "binary" }) -@Tags({"json", "avro", "binary"}) +@InputRequirement(Requirement.INPUT_REQUIRED) @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such " + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this " + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of " http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java index 6f126aa,ea84daa..43b33ff --- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java +++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java @@@ -30,8 -30,7 +30,9 @@@ import org.apache.avro.Schema import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData.Record; + import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@@ -68,114 -66,118 +69,108 @@@ public class ConvertCSVToAvro extends A private static final Validator CHAR_VALIDATOR = new Validator() { @Override -- public ValidationResult validate(String subject, String input, -- ValidationContext context) { ++ public ValidationResult validate(String subject, String input, ValidationContext context) { + // Allows special, escaped characters as input, which is then unescaped and converted to a single character. + // Examples for special characters: \t (or \u0009), \f. + input = unescapeString(input); + return new ValidationResult.Builder() -- .subject(subject) -- .input(input) - .explanation("Only single characters are supported") - .valid(input.length() == 1) - .explanation("Only non-null single characters are supported") - .valid(input.length() == 1 && input.charAt(0) != 0) -- .build(); ++ .subject(subject) ++ .input(input) ++ .explanation("Only non-null single characters are supported") ++ .valid(input.length() == 1 && input.charAt(0) != 0) ++ .build(); } }; private static final Relationship SUCCESS = new Relationship.Builder() -- .name("success") -- .description("Avro content that was converted successfully from CSV") -- .build(); ++ .name("success") ++ .description("Avro content that was converted successfully from CSV") ++ .build(); private static final Relationship FAILURE = new Relationship.Builder() -- .name("failure") -- .description("CSV content that could not be processed") -- .build(); ++ .name("failure") ++ .description("CSV content that could not be processed") ++ .build(); private static final Relationship INCOMPATIBLE = new Relationship.Builder() -- .name("incompatible") -- .description("CSV content that could not be converted") -- .build(); ++ .name("incompatible") ++ .description("CSV content that could not be converted") ++ .build(); @VisibleForTesting -- static final PropertyDescriptor SCHEMA -- = new PropertyDescriptor.Builder() -- .name("Record schema") -- .description("Outgoing Avro schema for each record created from a CSV row") -- .addValidator(SCHEMA_VALIDATOR) -- .expressionLanguageSupported(true) -- .required(true) -- .build(); ++ static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder() ++ .name("Record schema") ++ .description("Outgoing Avro schema for each record created from a CSV row") ++ .addValidator(SCHEMA_VALIDATOR) ++ .expressionLanguageSupported(true) ++ .required(true) ++ .build(); @VisibleForTesting -- static final PropertyDescriptor CHARSET -- = new PropertyDescriptor.Builder() -- .name("CSV charset") -- .description("Character set for CSV files") -- .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) -- .defaultValue(DEFAULTS.charset) -- .build(); ++ static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder() ++ .name("CSV charset") ++ .description("Character set for CSV files") ++ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR) ++ .defaultValue(DEFAULTS.charset) ++ .build(); @VisibleForTesting -- static final PropertyDescriptor DELIMITER -- = new PropertyDescriptor.Builder() -- .name("CSV delimiter") -- .description("Delimiter character for CSV records") -- .addValidator(CHAR_VALIDATOR) -- .defaultValue(DEFAULTS.delimiter) -- .build(); ++ static final PropertyDescriptor DELIMITER = new PropertyDescriptor.Builder() ++ .name("CSV delimiter") ++ .description("Delimiter character for CSV records") ++ .addValidator(CHAR_VALIDATOR) ++ .defaultValue(DEFAULTS.delimiter) ++ .build(); @VisibleForTesting -- static final PropertyDescriptor QUOTE -- = new PropertyDescriptor.Builder() -- .name("CSV quote character") -- .description("Quote character for CSV values") -- .addValidator(CHAR_VALIDATOR) -- .defaultValue(DEFAULTS.quote) -- .build(); ++ static final PropertyDescriptor QUOTE = new PropertyDescriptor.Builder() ++ .name("CSV quote character") ++ .description("Quote character for CSV values") ++ .addValidator(CHAR_VALIDATOR) ++ .defaultValue(DEFAULTS.quote) ++ .build(); @VisibleForTesting -- static final PropertyDescriptor ESCAPE -- = new PropertyDescriptor.Builder() -- .name("CSV escape character") -- .description("Escape character for CSV values") -- .addValidator(CHAR_VALIDATOR) -- .defaultValue(DEFAULTS.escape) -- .build(); ++ static final PropertyDescriptor ESCAPE = new PropertyDescriptor.Builder() ++ .name("CSV escape character") ++ .description("Escape character for CSV values") ++ .addValidator(CHAR_VALIDATOR) ++ .defaultValue(DEFAULTS.escape) ++ .build(); @VisibleForTesting -- static final PropertyDescriptor HAS_HEADER -- = new PropertyDescriptor.Builder() -- .name("Use CSV header line") -- .description("Whether to use the first line as a header") -- .addValidator(StandardValidators.BOOLEAN_VALIDATOR) -- .defaultValue(String.valueOf(DEFAULTS.useHeader)) -- .build(); ++ static final PropertyDescriptor HAS_HEADER = new PropertyDescriptor.Builder() ++ .name("Use CSV header line") ++ .description("Whether to use the first line as a header") ++ .addValidator(StandardValidators.BOOLEAN_VALIDATOR) ++ .defaultValue(String.valueOf(DEFAULTS.useHeader)) ++ .build(); @VisibleForTesting -- static final PropertyDescriptor LINES_TO_SKIP -- = new PropertyDescriptor.Builder() -- .name("Lines to skip") -- .description("Number of lines to skip before reading header or data") -- .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true)) -- .defaultValue(String.valueOf(DEFAULTS.linesToSkip)) -- .build(); -- -- private static final List<PropertyDescriptor> PROPERTIES -- = ImmutableList.<PropertyDescriptor>builder() -- .addAll(AbstractKiteProcessor.getProperties()) -- .add(SCHEMA) -- .add(CHARSET) -- .add(DELIMITER) -- .add(QUOTE) -- .add(ESCAPE) -- .add(HAS_HEADER) -- .add(LINES_TO_SKIP) -- .build(); -- -- private static final Set<Relationship> RELATIONSHIPS -- = ImmutableSet.<Relationship>builder() -- .add(SUCCESS) -- .add(FAILURE) -- .add(INCOMPATIBLE) -- .build(); ++ static final PropertyDescriptor LINES_TO_SKIP = new PropertyDescriptor.Builder() ++ .name("Lines to skip") ++ .description("Number of lines to skip before reading header or data") ++ .addValidator(createLongValidator(0L, Integer.MAX_VALUE, true)) ++ .defaultValue(String.valueOf(DEFAULTS.linesToSkip)) ++ .build(); ++ ++ private static final List<PropertyDescriptor> PROPERTIES = ImmutableList.<PropertyDescriptor> builder() ++ .addAll(AbstractKiteProcessor.getProperties()) ++ .add(SCHEMA) ++ .add(CHARSET) ++ .add(DELIMITER) ++ .add(QUOTE) ++ .add(ESCAPE) ++ .add(HAS_HEADER) ++ .add(LINES_TO_SKIP) ++ .build(); ++ ++ private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship> builder() ++ .add(SUCCESS) ++ .add(FAILURE) ++ .add(INCOMPATIBLE) ++ .build(); // Immutable configuration @VisibleForTesting @@@ -196,26 -198,26 +191,26 @@@ super.setDefaultConfiguration(context); this.props = new CSVProperties.Builder() -- .charset(context.getProperty(CHARSET).getValue()) -- .delimiter(context.getProperty(DELIMITER).getValue()) -- .quote(context.getProperty(QUOTE).getValue()) -- .escape(context.getProperty(ESCAPE).getValue()) -- .hasHeader(context.getProperty(HAS_HEADER).asBoolean()) -- .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger()) -- .build(); ++ .charset(context.getProperty(CHARSET).getValue()) ++ .delimiter(context.getProperty(DELIMITER).getValue()) ++ .quote(context.getProperty(QUOTE).getValue()) ++ .escape(context.getProperty(ESCAPE).getValue()) ++ .hasHeader(context.getProperty(HAS_HEADER).asBoolean()) ++ .linesToSkip(context.getProperty(LINES_TO_SKIP).asInteger()) ++ .build(); } @Override public void onTrigger(ProcessContext context, final ProcessSession session) -- throws ProcessException { ++ throws ProcessException { FlowFile incomingCSV = session.get(); if (incomingCSV == null) { return; } String schemaProperty = context.getProperty(SCHEMA) -- .evaluateAttributeExpressions(incomingCSV) -- .getValue(); ++ .evaluateAttributeExpressions(incomingCSV) ++ .getValue(); final Schema schema; try { schema = getSchema(schemaProperty, DefaultConfiguration.get()); @@@ -225,78 -227,85 +220,87 @@@ return; } -- final DataFileWriter<Record> writer = new DataFileWriter<>( -- AvroUtil.newDatumWriter(schema, Record.class)); -- writer.setCodec(CodecFactory.snappyCodec()); ++ try (final DataFileWriter<Record> writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) { ++ writer.setCodec(CodecFactory.snappyCodec()); -- try { -- final LongHolder written = new LongHolder(0L); -- final FailureTracker failures = new FailureTracker(); -- -- FlowFile badRecords = session.clone(incomingCSV); -- FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() { -- @Override -- public void process(InputStream in, OutputStream out) throws IOException { -- try (CSVFileReader<Record> reader = new CSVFileReader<>( ++ try { ++ final LongHolder written = new LongHolder(0L); ++ final FailureTracker failures = new FailureTracker(); ++ ++ FlowFile badRecords = session.clone(incomingCSV); ++ FlowFile outgoingAvro = session.write(incomingCSV, new StreamCallback() { ++ @Override ++ public void process(InputStream in, OutputStream out) throws IOException { ++ try (CSVFileReader<Record> reader = new CSVFileReader<>( in, props, schema, Record.class)) { -- reader.initialize(); -- try (DataFileWriter<Record> w = writer.create(schema, out)) { -- while (reader.hasNext()) { -- try { -- Record record = reader.next(); -- w.append(record); -- written.incrementAndGet(); -- } catch (DatasetRecordException e) { -- failures.add(e); ++ reader.initialize(); ++ try (DataFileWriter<Record> w = writer.create(schema, out)) { ++ while (reader.hasNext()) { ++ try { ++ Record record = reader.next(); ++ w.append(record); ++ written.incrementAndGet(); ++ } catch (DatasetRecordException e) { ++ failures.add(e); ++ } } } } } -- } -- }); ++ }); -- long errors = failures.count(); ++ long errors = failures.count(); -- session.adjustCounter("Converted records", written.get(), ++ session.adjustCounter("Converted records", written.get(), false /* update only if file transfer is successful */); -- session.adjustCounter("Conversion errors", errors, ++ session.adjustCounter("Conversion errors", errors, false /* update only if file transfer is successful */); -- if (written.get() > 0L) { -- session.transfer(outgoingAvro, SUCCESS); ++ if (written.get() > 0L) { ++ session.transfer(outgoingAvro, SUCCESS); -- if (errors > 0L) { -- getLogger().warn("Failed to convert {}/{} records from CSV to Avro", -- new Object[] { errors, errors + written.get() }); -- badRecords = session.putAttribute( ++ if (errors > 0L) { ++ getLogger().warn("Failed to convert {}/{} records from CSV to Avro", ++ new Object[] {errors, errors + written.get()}); ++ badRecords = session.putAttribute( badRecords, "errors", failures.summary()); -- session.transfer(badRecords, INCOMPATIBLE); -- } else { -- session.remove(badRecords); -- } ++ session.transfer(badRecords, INCOMPATIBLE); ++ } else { ++ session.remove(badRecords); ++ } -- } else { -- session.remove(outgoingAvro); ++ } else { ++ session.remove(outgoingAvro); -- if (errors > 0L) { -- getLogger().warn("Failed to convert {}/{} records from CSV to Avro", -- new Object[] { errors, errors }); -- badRecords = session.putAttribute( ++ if (errors > 0L) { ++ getLogger().warn("Failed to convert {}/{} records from CSV to Avro", ++ new Object[] {errors, errors}); ++ badRecords = session.putAttribute( badRecords, "errors", failures.summary()); -- } else { -- badRecords = session.putAttribute( ++ } else { ++ badRecords = session.putAttribute( badRecords, "errors", "No incoming records"); ++ } ++ ++ session.transfer(badRecords, FAILURE); } -- session.transfer(badRecords, FAILURE); ++ } catch (ProcessException | DatasetIOException e) { ++ getLogger().error("Failed reading or writing", e); ++ session.transfer(incomingCSV, FAILURE); ++ } catch (DatasetException e) { ++ getLogger().error("Failed to read FlowFile", e); ++ session.transfer(incomingCSV, FAILURE); } - - } catch (ProcessException | DatasetIOException e) { - getLogger().error("Failed reading or writing", e); - session.transfer(incomingCSV, FAILURE); - } catch (DatasetException e) { - getLogger().error("Failed to read FlowFile", e); - session.transfer(incomingCSV, FAILURE); ++ } catch (final IOException ioe) { ++ throw new RuntimeException("Unable to close Avro Writer", ioe); + } + } - } catch (ProcessException | DatasetIOException e) { - getLogger().error("Failed reading or writing", e); - session.transfer(incomingCSV, FAILURE); - } catch (DatasetException e) { - getLogger().error("Failed to read FlowFile", e); - session.transfer(incomingCSV, FAILURE); + private static String unescapeString(String input) { + if (input.length() > 1) { + input = StringEscapeUtils.unescapeJava(input); } + return input; } } http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteSQL.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/InvokeHTTP.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0636f0e7/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java ---------------------------------------------------------------------- diff --cc nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java index 258e122,9ad1703..88b6666 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenHTTP.java @@@ -63,9 -61,8 +63,9 @@@ import org.eclipse.jetty.servlet.Servle import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.QueuedThreadPool; +@InputRequirement(Requirement.INPUT_FORBIDDEN) @Tags({"ingest", "http", "https", "rest", "listen"}) - @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The URL of the Service will be http://{hostname}:{port}/contentListener") + @CapabilityDescription("Starts an HTTP Server that is used to receive FlowFiles from remote sources. The default URI of the Service will be http://{hostname}:{port}/contentListener") public class ListenHTTP extends AbstractSessionFactoryProcessor { private Set<Relationship> relationships;
