Repository: nifi
Updated Branches:
  refs/heads/master a3b72f1bb -> 3906d4e1d


http://git-wip-us.apache.org/repos/asf/nifi/blob/3906d4e1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
index 0acf6ff..fb28b17 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryRecordSetWriter.java
@@ -36,6 +36,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.schema.access.ConfluentSchemaRegistryWriter;
 import org.apache.nifi.schema.access.HortonworksAttributeSchemaReferenceWriter;
 import org.apache.nifi.schema.access.HortonworksEncodedSchemaReferenceWriter;
 import org.apache.nifi.schema.access.NopSchemaAccessWriter;
@@ -63,6 +64,11 @@ public abstract class SchemaRegistryRecordSetWriter extends 
SchemaRegistryServic
     static final AllowableValue HWX_SCHEMA_REF_ATTRIBUTES = new 
AllowableValue("hwx-schema-ref-attributes", "HWX Schema Reference Attributes",
         "The FlowFile will be given a set of 3 attributes to describe the 
schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. 
Note that if "
             + "the schema for a record does not contain the necessary 
identifier and version, an Exception will be thrown when attempting to write 
the data.");
+    static final AllowableValue CONFLUENT_ENCODED_SCHEMA = new 
AllowableValue("confluent-encoded", "Confluent Schema Registry Reference",
+        "The content of the FlowFile will contain a reference to a schema in 
the Schema Registry service. The reference is encoded as a single "
+            + "'Magic Byte' followed by 4 bytes representing the identifier of 
the schema, as outlined at 
http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html.
 "
+            + "This will be prepended to each FlowFile. Note that if the 
schema for a record does not contain the necessary identifier and version, "
+            + "an Exception will be thrown when attempting to write the data. 
This is based on the encoding used by version 3.2.x of the Confluent Schema 
Registry.");
     static final AllowableValue NO_SCHEMA = new AllowableValue("no-schema", 
"Do Not Write Schema", "Do not add any schema-related information to the 
FlowFile.");
 
     /**
@@ -73,8 +79,6 @@ public abstract class SchemaRegistryRecordSetWriter extends 
SchemaRegistryServic
     private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new 
PropertyDescriptor.Builder()
         .name("Schema Write Strategy")
         .description("Specifies how the schema for a Record should be added to 
the data.")
-        .allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, 
HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA)
-        .defaultValue(SCHEMA_NAME_ATTRIBUTE.getValue())
         .required(true)
         .build();
 
@@ -83,7 +87,7 @@ public abstract class SchemaRegistryRecordSetWriter extends 
SchemaRegistryServic
     private volatile SchemaAccessWriter schemaAccessWriter;
 
     private final List<AllowableValue> schemaWriteStrategyList = 
Collections.unmodifiableList(Arrays.asList(
-        SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, 
HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, NO_SCHEMA));
+        SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, 
HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA, 
CONFLUENT_ENCODED_SCHEMA, NO_SCHEMA));
     private final List<AllowableValue> schemaAccessStrategyList = 
Collections.unmodifiableList(Arrays.asList(
         SCHEMA_NAME_PROPERTY, INHERIT_RECORD_SCHEMA, SCHEMA_TEXT_PROPERTY));
 
@@ -156,6 +160,8 @@ public abstract class SchemaRegistryRecordSetWriter extends 
SchemaRegistryServic
             return new HortonworksEncodedSchemaReferenceWriter();
         } else if 
(allowableValue.equalsIgnoreCase(HWX_SCHEMA_REF_ATTRIBUTES.getValue())) {
             return new HortonworksAttributeSchemaReferenceWriter();
+        } else if 
(allowableValue.equalsIgnoreCase(CONFLUENT_ENCODED_SCHEMA.getValue())) {
+            return new ConfluentSchemaRegistryWriter();
         } else if (allowableValue.equalsIgnoreCase(NO_SCHEMA.getValue())) {
             return new NopSchemaAccessWriter();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/3906d4e1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index ddcfb0c..53b030a 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -51,6 +51,7 @@ import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPER
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
 import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT_PROPERTY;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.CONFLUENT_ENCODED_SCHEMA;
 
 public abstract class SchemaRegistryService extends AbstractControllerService {
 
@@ -59,7 +60,7 @@ public abstract class SchemaRegistryService extends 
AbstractControllerService {
     private static final InputStream EMPTY_INPUT_STREAM = new 
ByteArrayInputStream(new byte[0]);
 
     private final List<AllowableValue> strategyList = 
Collections.unmodifiableList(Arrays.asList(
-        SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, 
HWX_CONTENT_ENCODED_SCHEMA));
+        SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY, HWX_SCHEMA_REF_ATTRIBUTES, 
HWX_CONTENT_ENCODED_SCHEMA, CONFLUENT_ENCODED_SCHEMA));
 
     protected PropertyDescriptor getSchemaAcessStrategyDescriptor() {
         return getPropertyDescriptor(SCHEMA_ACCESS_STRATEGY.getName());

http://git-wip-us.apache.org/repos/asf/nifi/blob/3906d4e1/nifi-nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 28ff134..06dc5ab 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -35,6 +35,7 @@
         <module>nifi-kafka-bundle</module>
         <module>nifi-kite-bundle</module>
         <module>nifi-solr-bundle</module>
+        <module>nifi-confluent-platform-bundle</module>
         <module>nifi-aws-bundle</module>
         <module>nifi-social-media-bundle</module>
         <module>nifi-enrich-bundle</module>

http://git-wip-us.apache.org/repos/asf/nifi/blob/3906d4e1/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a66cbf1..7d840dc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1278,6 +1278,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-confluent-platform-nar</artifactId>
+                <version>1.4.0-SNAPSHOT</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-kerberos-iaa-providers-nar</artifactId>
                 <version>1.4.0-SNAPSHOT</version>
                 <type>nar</type>
@@ -1456,7 +1462,7 @@
                 <version>1.4.0-SNAPSHOT</version>
                 <type>nar</type>
             </dependency>
-           <dependency>
+               <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-properties</artifactId>
                 <version>1.4.0-SNAPSHOT</version>
@@ -1473,6 +1479,11 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>nifi-confluent-schema-registry-service</artifactId>
+                <version>1.4.0-SNAPSHOT</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-logging-utils</artifactId>
                 <version>1.4.0-SNAPSHOT</version>
             </dependency>

Reply via email to