Repository: nifi Updated Branches: refs/heads/master a3b72f1bb -> 3906d4e1d
http://git-wip-us.apache.org/repos/asf/nifi/blob/3906d4e1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java index 0acf6ff..fb28b17 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java @@ -36,6 +36,7 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter; import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter; import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter; import org.apache.nifi.schema.access.NopSchemaAccessWriter; @@ -63,6 +64,11 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes", "The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if " + "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data."); + static final AllowableValue CONFLUENT_ENCODED_SCHEMA = new AllowableValue("confluent-encoded", "Confluent Schema Registry Reference", + "The content of the FlowFile will contain a reference to a schema in the Schema Registry service. The reference is encoded as a single " + + "'Magic Byte' followed by 4 bytes representing the identifier of the schema, as outlined at http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html. " + + "This will be prepended to each FlowFile. Note that if the schema for a record does not contain the necessary identifier and version, " + + "an Exception will be thrown when attempting to write the data. This is based on the encoding used by version 3.2.x of the Confluent Schema Registry."); static final AllowableValue NO_SCHEMA = new AllowableValue("no-schema", "Do Not Write Schema", "Do not add any schema-related information to the FlowFile."); /** @@ -73,8 +79,6 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder() .name("Schema Write Strategy") .description("Specifies how the schema for a Record should be added to the data.") - .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA) - .defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue()) .required(true) .build(); @@ -83,7 +87,7 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic private volatile SchemaAccessWriter schemaAccessWriter; private final List<AllowableValue> schemaWriteStrategyList = Collections.unmodifiableList(Arrays.asList( - SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA)); + SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA, NO_SCHEMA)); private final List<AllowableValue> schemaAccessStrategyList = Collections.unmodifiableList(Arrays.asList( SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY)); @@ -156,6 +160,8 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic return new HortonworksEncodedSchemaReferenceWriter(); } else if (allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) { return new HortonworksAttributeSchemaReferenceWriter(); + } else if (allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) { + return new ConfluentSchemaRegistryWriter(); } else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) { return new NopSchemaAccessWriter(); } http://git-wip-us.apache.org/repos/asf/nifi/blob/3906d4e1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java index ddcfb0c..53b030a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java @@ -51,6 +51,7 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPER import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY; +import static org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA; public abstract class SchemaRegistryService extends AbstractControllerService { @@ -59,7 +60,7 @@ public abstract class SchemaRegistryService extends AbstractControllerService { private static final InputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); private final List<AllowableValue> strategyList = Collections.unmodifiableList(Arrays.asList( - SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)); + SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA)); protected PropertyDescriptor getSchemaAcessStrategyDescriptor() { return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName()); http://git-wip-us.apache.org/repos/asf/nifi/blob/3906d4e1/nifi-nar-bundles/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml index 28ff134..06dc5ab 100755 --- a/nifi-nar-bundles/pom.xml +++ b/nifi-nar-bundles/pom.xml @@ -35,6 +35,7 @@ <module>nifi-kafka-bundle</module> <module>nifi-kite-bundle</module> <module>nifi-solr-bundle</module> + <module>nifi-confluent-platform-bundle</module> <module>nifi-aws-bundle</module> <module>nifi-social-media-bundle</module> <module>nifi-enrich-bundle</module> http://git-wip-us.apache.org/repos/asf/nifi/blob/3906d4e1/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a66cbf1..7d840dc 100644 --- a/pom.xml +++ b/pom.xml @@ -1278,6 +1278,12 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-confluent-platform-nar</artifactId> + <version>1.4.0-SNAPSHOT</version> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-kerberos-iaa-providers-nar</artifactId> <version>1.4.0-SNAPSHOT</version> <type>nar</type> @@ -1456,7 +1462,7 @@ <version>1.4.0-SNAPSHOT</version> <type>nar</type> </dependency> - <dependency> + <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-properties</artifactId> <version>1.4.0-SNAPSHOT</version> @@ -1473,6 +1479,11 @@ </dependency> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-confluent-schema-registry-service</artifactId> + <version>1.4.0-SNAPSHOT</version> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-logging-utils</artifactId> <version>1.4.0-SNAPSHOT</version> </dependency>
