NIFI-4935 Refactoring to support specifying schema branch or schema version 
when using schema by name strategy

This closes #2523.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/de71a41b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/de71a41b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/de71a41b

Branch: refs/heads/master
Commit: de71a41bd0d2d8fd90884d8a246ef4c537d48196
Parents: 930417b
Author: Bryan Bende <[email protected]>
Authored: Mon Mar 5 17:02:20 2018 -0500
Committer: Mark Payne <[email protected]>
Committed: Thu Mar 15 16:16:12 2018 -0400

----------------------------------------------------------------------
 .../apache/nifi/schema/access/SchemaField.java  |   3 +-
 .../schema/access/SchemaNotFoundException.java  |   2 +-
 .../serialization/record/SchemaIdentifier.java  |  28 ++-
 .../record/StandardSchemaIdentifier.java        |  60 +++++-
 .../schemaregistry/ConfluentSchemaRegistry.java |  63 ++++---
 .../client/RestSchemaRegistryClient.java        |   2 +-
 .../nifi/schema/access/SchemaAccessUtils.java   |  55 +++++-
 .../record/MockSchemaRegistry.java              |  59 +++---
 .../nifi-standard-record-utils/pom.xml          |   6 +
 .../access/ConfluentSchemaRegistryStrategy.java |  17 +-
 .../access/ConfluentSchemaRegistryWriter.java   |  10 +-
 ...onworksAttributeSchemaReferenceStrategy.java |   5 +-
 ...rtonworksAttributeSchemaReferenceWriter.java |  11 +-
 ...rtonworksEncodedSchemaReferenceStrategy.java |   4 +-
 ...HortonworksEncodedSchemaReferenceWriter.java |  10 +-
 .../schema/access/SchemaNameAsAttribute.java    |  28 ++-
 .../access/SchemaNamePropertyStrategy.java      |  34 +++-
 .../AbstractSchemaAccessStrategyTest.java       |  51 +++++
 .../schema/access/SchemaIdentifierMatcher.java  |  42 +++++
 .../TestConfluentSchemaRegistryStrategy.java    |  81 ++++++++
 .../TestConfluentSchemaRegistryWriter.java      |  76 ++++++++
 ...onworksAttributeSchemaReferenceStrategy.java |  64 +++++++
 ...rtonworksAttributeSchemaReferenceWriter.java |  99 ++++++++++
 ...rtonworksEncodedSchemaReferenceStrategy.java |  87 +++++++++
 ...HortonworksEncodedSchemaReferenceWriter.java |  12 +-
 .../access/TestSchemaNameAsAttribute.java       |  90 +++++++++
 .../access/TestSchemaNamePropertyStrategy.java  | 148 +++++++++++++++
 .../services/AvroSchemaRegistry.java            |  75 +++-----
 .../services/TestAvroSchemaRegistry.java        |  23 ++-
 .../processors/standard/ValidateRecord.java     |  30 +--
 .../nifi-hwx-schema-registry-service/pom.xml    |   2 +-
 .../hortonworks/HortonworksSchemaRegistry.java  | 189 +++++++++++--------
 .../TestHortonworksSchemaRegistry.java          |  43 +++--
 .../serialization/SchemaRegistryService.java    |   4 +
 .../schemaregistry/services/SchemaRegistry.java |  56 ++++--
 35 files changed, 1276 insertions(+), 293 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
index 2fe06f4..1844eea 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java
@@ -22,7 +22,8 @@ public enum SchemaField {
     SCHEMA_TEXT_FORMAT("Schema Text Format"),
     SCHEMA_NAME("Schema Name"),
     SCHEMA_IDENTIFIER("Schema Identifier"),
-    SCHEMA_VERSION("Schema Version");
+    SCHEMA_VERSION("Schema Version"),
+    SCHEMA_BRANCH_NAME("Schema Branch Name");
 
     private final String description;
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
index 9a064ff..377a176 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java
@@ -23,7 +23,7 @@ public class SchemaNotFoundException extends Exception {
     }
 
     public SchemaNotFoundException(final String message, final Throwable 
cause) {
-        super(cause);
+        super(message, cause);
     }
 
     public SchemaNotFoundException(final Throwable cause) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
index d7f5664..bca408a 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java
@@ -38,14 +38,32 @@ public interface SchemaIdentifier {
      */
     OptionalInt getVersion();
 
+    /**
+     * @return the name of the branch where the schema is located, if one has 
been defined
+     */
+    Optional<String> getBranch();
+
 
-    public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, 
null, null);
+    SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, 
null);
 
-    public static SchemaIdentifier ofName(final String name) {
-        return new StandardSchemaIdentifier(name, null, null);
+    static Builder builder() {
+        return new StandardSchemaIdentifier.Builder();
     }
 
-    public static SchemaIdentifier of(final String name, final long 
identifier, final int version) {
-        return new StandardSchemaIdentifier(name, identifier, version);
+    /**
+     * Implementations should provide a builder to create instances of the 
SchemaIdentifier.
+     */
+    interface Builder {
+
+        Builder name(String name);
+
+        Builder id(Long id);
+
+        Builder version(Integer version);
+
+        Builder branch(String branch);
+
+        SchemaIdentifier build();
+
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
index 86db284..712486b 100644
--- 
a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
+++ 
b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java
@@ -25,11 +25,17 @@ public class StandardSchemaIdentifier implements 
SchemaIdentifier {
     private final Optional<String> name;
     private final OptionalLong identifier;
     private final OptionalInt version;
+    private final Optional<String> branch;
 
-    StandardSchemaIdentifier(final String name, final Long identifier, final 
Integer version) {
+    StandardSchemaIdentifier(final String name, final Long identifier, final 
Integer version, final String branch) {
         this.name = Optional.ofNullable(name);
         this.identifier = identifier == null ? OptionalLong.empty() : 
OptionalLong.of(identifier);;
         this.version = version == null ? OptionalInt.empty() : 
OptionalInt.of(version);;
+        this.branch = Optional.ofNullable(branch);
+
+        if (this.name == null && this.identifier == null) {
+            throw new IllegalStateException("Name or Identifier must be 
provided");
+        }
     }
 
     @Override
@@ -48,8 +54,13 @@ public class StandardSchemaIdentifier implements 
SchemaIdentifier {
     }
 
     @Override
+    public Optional<String> getBranch() {
+        return branch;
+    }
+
+    @Override
     public int hashCode() {
-        return 31 + 41 * getName().hashCode() + 41 * 
getIdentifier().hashCode() + 41 * getVersion().hashCode();
+        return 31 + 41 * getName().hashCode() + 41 * 
getIdentifier().hashCode() + 41 * getVersion().hashCode() + 41 * 
getBranch().hashCode();
     }
 
     @Override
@@ -64,6 +75,49 @@ public class StandardSchemaIdentifier implements 
SchemaIdentifier {
             return false;
         }
         final SchemaIdentifier other = (SchemaIdentifier) obj;
-        return getName().equals(other.getName()) && 
getIdentifier().equals(other.getIdentifier()) && 
getVersion().equals(other.getVersion());
+        return getName().equals(other.getName())
+                && getIdentifier().equals(other.getIdentifier())
+                && getVersion().equals(other.getVersion())
+                && getBranch().equals(other.getBranch());
+    }
+
+    /**
+     * Builder to create instances of SchemaIdentifier.
+     */
+    public static class Builder implements SchemaIdentifier.Builder {
+
+        private String name;
+        private String branch;
+        private Long identifier;
+        private Integer version;
+
+        @Override
+        public SchemaIdentifier.Builder name(final String name) {
+            this.name = name;
+            return this;
+        }
+
+        @Override
+        public SchemaIdentifier.Builder id(final Long id) {
+            this.identifier = id;
+            return this;
+        }
+
+        @Override
+        public SchemaIdentifier.Builder version(final Integer version) {
+            this.version = version;
+            return this;
+        }
+
+        @Override
+        public SchemaIdentifier.Builder branch(final String branch) {
+            this.branch = branch;
+            return this;
+        }
+
+        @Override
+        public SchemaIdentifier build() {
+            return new StandardSchemaIdentifier(name, identifier, version, 
branch);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
index 113e096..3558286 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
@@ -17,19 +17,6 @@
 
 package org.apache.nifi.confluent.schemaregistry;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import javax.net.ssl.SSLContext;
-
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -47,9 +34,24 @@ import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.ssl.SSLContextService.ClientAuth;
 
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 
 
 @Tags({"schema", "registry", "confluent", "avro", "kafka"})
@@ -176,28 +178,33 @@ public class ConfluentSchemaRegistry extends 
AbstractControllerService implement
         return baseUrls;
     }
 
-    @Override
-    public String retrieveSchemaText(final String schemaName) throws 
IOException, SchemaNotFoundException {
-        final RecordSchema schema = retrieveSchema(schemaName);
-        return schema.getSchemaText().get();
-    }
+    private RecordSchema retrieveSchemaByName(final SchemaIdentifier 
schemaIdentifier) throws IOException, SchemaNotFoundException {
+        final Optional<String> schemaName = schemaIdentifier.getName();
+        if (!schemaName.isPresent()) {
+            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema 
because Schema Name is not present");
+        }
 
-    @Override
-    public String retrieveSchemaText(final long schemaId, final int version) 
throws IOException, SchemaNotFoundException {
-        final RecordSchema schema = retrieveSchema(schemaId, version);
-        return schema.getSchemaText().get();
+        final RecordSchema schema = client.getSchema(schemaName.get());
+        return schema;
     }
 
-    @Override
-    public RecordSchema retrieveSchema(final String schemaName) throws 
IOException, SchemaNotFoundException {
-        final RecordSchema schema = client.getSchema(schemaName);
+    private RecordSchema retrieveSchemaById(final SchemaIdentifier 
schemaIdentifier) throws IOException, SchemaNotFoundException {
+        final OptionalLong schemaId = schemaIdentifier.getIdentifier();
+        if (!schemaId.isPresent()) {
+            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema 
because Schema Id is not present");
+        }
+
+        final RecordSchema schema = client.getSchema((int) 
schemaId.getAsLong());
         return schema;
     }
 
     @Override
-    public RecordSchema retrieveSchema(final long schemaId, final int version) 
throws IOException, SchemaNotFoundException {
-        final RecordSchema schema = client.getSchema((int) schemaId);
-        return schema;
+    public RecordSchema retrieveSchema(final SchemaIdentifier 
schemaIdentifier) throws IOException, SchemaNotFoundException {
+        if (schemaIdentifier.getName().isPresent()) {
+            return retrieveSchemaByName(schemaIdentifier);
+        } else {
+            return retrieveSchemaById(schemaIdentifier);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index b2ad19b..14e3e83 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -145,7 +145,7 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
 
         try {
             final Schema avroSchema = new Schema.Parser().parse(schemaText);
-            final SchemaIdentifier schemaId = SchemaIdentifier.of(subject, id, 
version);
+            final SchemaIdentifier schemaId = 
SchemaIdentifier.builder().name(subject).id(Long.valueOf(id)).version(version).build();
 
             final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
             return recordSchema;

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
index 4f0f945..17aef2a 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java
@@ -16,19 +16,20 @@
  */
 package org.apache.nifi.schema.access;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.nifi.avro.AvroSchemaValidator;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
 public class SchemaAccessUtils {
 
     public static final AllowableValue SCHEMA_NAME_PROPERTY = new 
AllowableValue("schema-name", "Use 'Schema Name' Property",
@@ -77,6 +78,26 @@ public class SchemaAccessUtils {
             .required(false)
             .build();
 
+    public static final PropertyDescriptor SCHEMA_BRANCH_NAME = new 
PropertyDescriptor.Builder()
+            .name("schema-branch")
+            .displayName("Schema Branch")
+            .description("Specifies the name of the branch to use when looking 
up the schema in the Schema Registry property. " +
+                    "If the chosen Schema Registry does not support branching, 
this value will be ignored.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+
+    public static final PropertyDescriptor SCHEMA_VERSION = new 
PropertyDescriptor.Builder()
+            .name("schema-version")
+            .displayName("Schema Version")
+            .description("Specifies the version of the schema to lookup in the 
Schema Registry. " +
+                    "If not specified then the latest version of the schema 
will be retrieved.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .required(false)
+            .build();
+
     public static final PropertyDescriptor SCHEMA_TEXT = new 
PropertyDescriptor.Builder()
             .name("schema-text")
             .displayName("Schema Text")
@@ -89,12 +110,15 @@ public class SchemaAccessUtils {
 
     public static Collection<ValidationResult> 
validateSchemaAccessStrategy(final ValidationContext validationContext, final 
String schemaAccessStrategyValue,
                                                                             
final List<AllowableValue> schemaAccessStrategyValues) {
+
+        final Collection<ValidationResult> validationResults = new 
ArrayList<>();
+
         if (isSchemaRegistryRequired(schemaAccessStrategyValue)) {
             final boolean registrySet = 
validationContext.getProperty(SCHEMA_REGISTRY).isSet();
             if (!registrySet) {
                 final String schemaAccessStrategyName = 
getSchemaAccessStrategyName(schemaAccessStrategyValue, 
schemaAccessStrategyValues);
 
-                return Collections.singleton(new ValidationResult.Builder()
+                validationResults.add(new ValidationResult.Builder()
                         .subject("Schema Registry")
                         .explanation("The '" + schemaAccessStrategyName + "' 
Schema Access Strategy requires that the Schema Registry property be set.")
                         .valid(false)
@@ -102,7 +126,21 @@ public class SchemaAccessUtils {
             }
         }
 
-        return Collections.emptyList();
+        // ensure that only branch or version is specified, but not both
+        if 
(SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessStrategyValue)) {
+            final boolean branchNameSet = 
validationContext.getProperty(SCHEMA_BRANCH_NAME).isSet();
+            final boolean versionSet = 
validationContext.getProperty(SCHEMA_VERSION).isSet();
+
+            if (branchNameSet && versionSet) {
+                validationResults.add(new ValidationResult.Builder()
+                        .subject(SCHEMA_BRANCH_NAME.getDisplayName())
+                        .explanation(SCHEMA_BRANCH_NAME.getDisplayName() + " 
and " + SCHEMA_VERSION.getDisplayName() + " cannot be specified together")
+                        .valid(false)
+                        .build());
+            }
+        }
+
+        return validationResults;
     }
 
     private static String getSchemaAccessStrategyName(final String 
schemaAccessValue, final List<AllowableValue> schemaAccessStrategyValues) {
@@ -123,7 +161,10 @@ public class SchemaAccessUtils {
 
     public static SchemaAccessStrategy getSchemaAccessStrategy(final String 
allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext 
context) {
         if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) {
-            return new SchemaNamePropertyStrategy(schemaRegistry, 
context.getProperty(SCHEMA_NAME));
+            final PropertyValue schemaName = context.getProperty(SCHEMA_NAME);
+            final PropertyValue schemaBranchName = 
context.getProperty(SCHEMA_BRANCH_NAME);
+            final PropertyValue schemaVersion = 
context.getProperty(SCHEMA_VERSION);
+            return new SchemaNamePropertyStrategy(schemaRegistry, schemaName, 
schemaBranchName, schemaVersion);
         } else if 
(allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) {
             return new InheritSchemaFromRecord();
         } else if 
(allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java
index 36bbe58..a5ec246 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java
@@ -17,19 +17,21 @@
 
 package org.apache.nifi.serialization.record;
 
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.util.Tuple;
+
 import java.io.IOException;
 import java.util.EnumSet;
 import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.schema.access.SchemaField;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.util.Tuple;
-
 public class MockSchemaRegistry extends AbstractControllerService implements 
SchemaRegistry {
     private final ConcurrentMap<String, RecordSchema> schemaNameMap = new 
ConcurrentHashMap<>();
     private final ConcurrentMap<Tuple<Long, Integer>, RecordSchema> 
schemaIdVersionMap = new ConcurrentHashMap<>();
@@ -38,39 +40,38 @@ public class MockSchemaRegistry extends 
AbstractControllerService implements Sch
         schemaNameMap.put(name, schema);
     }
 
-    @Override
-    public String retrieveSchemaText(final String schemaName) throws 
IOException, SchemaNotFoundException {
-        final RecordSchema schema = schemaNameMap.get(schemaName);
-        if (schema == null) {
-            return null;
+    private RecordSchema retrieveSchemaByName(final SchemaIdentifier 
schemaIdentifier) throws IOException, SchemaNotFoundException {
+        final Optional<String> schemaName = schemaIdentifier.getName();
+        if (!schemaName.isPresent()) {
+            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema 
because Schema Name is not present");
         }
 
-        final Optional<String> text = schema.getSchemaText();
-        return text.orElse(null);
+        return schemaNameMap.get(schemaName);
     }
 
-    @Override
-    public String retrieveSchemaText(final long schemaId, final int version) 
throws IOException, SchemaNotFoundException {
-        final Tuple<Long, Integer> tuple = new Tuple<>(schemaId, version);
-        final RecordSchema schema = schemaIdVersionMap.get(tuple);
-        if (schema == null) {
-            return null;
+    private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier 
schemaIdentifier) throws IOException, SchemaNotFoundException {
+        final OptionalLong schemaId = schemaIdentifier.getIdentifier();
+        if (!schemaId.isPresent()) {
+            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema 
because Schema Id is not present");
         }
 
-        final Optional<String> text = schema.getSchemaText();
-        return text.orElse(null);
-    }
+        final OptionalInt version = schemaIdentifier.getVersion();
+        if (!version.isPresent()) {
+            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema 
because Schema Version is not present");
+        }
 
-    @Override
-    public RecordSchema retrieveSchema(final String schemaName) throws 
IOException, SchemaNotFoundException {
-        return schemaNameMap.get(schemaName);
+        final Tuple<Long, Integer> tuple = new Tuple<>(schemaId.getAsLong(), 
version.getAsInt());
+        final RecordSchema schema = schemaIdVersionMap.get(tuple);
+        return schema;
     }
 
     @Override
-    public RecordSchema retrieveSchema(final long schemaId, final int version) 
throws IOException, SchemaNotFoundException {
-        final Tuple<Long, Integer> tuple = new Tuple<>(schemaId, version);
-        final RecordSchema schema = schemaIdVersionMap.get(tuple);
-        return schema;
+    public RecordSchema retrieveSchema(final SchemaIdentifier 
schemaIdentifier) throws IOException, SchemaNotFoundException {
+        if (schemaIdentifier.getName().isPresent()) {
+            return retrieveSchemaByName(schemaIdentifier);
+        } else {
+            return retrieveSchemaByIdAndVersion(schemaIdentifier);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml
index 7b68335..9bbf7a8 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml
@@ -56,5 +56,11 @@
             <artifactId>commons-csv</artifactId>
             <version>1.4</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock</artifactId>
+            <version>1.6.0-SNAPSHOT</version>
+           <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java
index b71e811..2734103 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java
@@ -17,6 +17,11 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
@@ -25,10 +30,6 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.nifi.schemaregistry.services.SchemaRegistry;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.stream.io.StreamUtils;
-
 public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy {
     private final Set<SchemaField> schemaFields;
     private final SchemaRegistry schemaRegistry;
@@ -64,7 +65,13 @@ public class ConfluentSchemaRegistryStrategy implements 
SchemaAccessStrategy {
         }
 
         final int schemaId = bb.getInt();
-        return schemaRegistry.retrieveSchema(schemaId, 1);
+
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
+                .id(Long.valueOf(schemaId))
+                .version(1)
+                .build();
+
+        return schemaRegistry.retrieveSchema(schemaIdentifier);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java
index 3677b9f..584f32e 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java
@@ -17,6 +17,9 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -27,16 +30,13 @@ import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 public class ConfluentSchemaRegistryWriter implements SchemaAccessWriter {
     private static final Set<SchemaField> requiredSchemaFields = 
EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
 
     @Override
     public void writeHeader(final RecordSchema schema, final OutputStream out) 
throws IOException {
         final SchemaIdentifier identifier = schema.getIdentifier();
-        final long id = identifier.getIdentifier().getAsLong();
+        final Long id = identifier.getIdentifier().getAsLong();
 
         // This encoding follows the pattern that is provided for serializing 
data by the Confluent Schema Registry serializer
         // as it is provided at:
@@ -45,7 +45,7 @@ public class ConfluentSchemaRegistryWriter implements 
SchemaAccessWriter {
         // representing the schema id.
         final ByteBuffer bb = ByteBuffer.allocate(5);
         bb.put((byte) 0);
-        bb.putInt((int) id);
+        bb.putInt(id.intValue());
 
         out.write(bb.array());
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
index 19606c0..f4fdfcb 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java
@@ -19,6 +19,7 @@ package org.apache.nifi.schema.access;
 
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -34,6 +35,7 @@ public class HortonworksAttributeSchemaReferenceStrategy 
implements SchemaAccess
     public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
     public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = 
"schema.protocol.version";
 
+
     private final SchemaRegistry schemaRegistry;
 
 
@@ -84,7 +86,8 @@ public class HortonworksAttributeSchemaReferenceStrategy 
implements SchemaAccess
         final long schemaId = Long.parseLong(schemaIdentifier);
         final int version = Integer.parseInt(schemaVersion);
 
-        final RecordSchema schema = schemaRegistry.retrieveSchema(schemaId, 
version);
+        final SchemaIdentifier identifier = 
SchemaIdentifier.builder().id(schemaId).version(version).build();
+        final RecordSchema schema = schemaRegistry.retrieveSchema(identifier);
         if (schema == null) {
             throw new SchemaNotFoundException("Could not find a Schema in the 
Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + 
version + "'");
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
index dd7d676..bea2f96 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java
@@ -29,7 +29,8 @@ import java.util.Set;
 
 public class HortonworksAttributeSchemaReferenceWriter implements 
SchemaAccessWriter {
     private static final Set<SchemaField> requiredSchemaFields = 
EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
-    private static final int LATEST_PROTOCOL_VERSION = 1;
+    static final int LATEST_PROTOCOL_VERSION = 1;
+    static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
 
     @Override
     public void writeHeader(RecordSchema schema, OutputStream out) throws 
IOException {
@@ -40,13 +41,17 @@ public class HortonworksAttributeSchemaReferenceWriter 
implements SchemaAccessWr
         final Map<String, String> attributes = new HashMap<>(4);
         final SchemaIdentifier id = schema.getIdentifier();
 
-        final long schemaId = id.getIdentifier().getAsLong();
-        final int schemaVersion = id.getVersion().getAsInt();
+        final Long schemaId = id.getIdentifier().getAsLong();
+        final Integer schemaVersion = id.getVersion().getAsInt();
 
         
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, 
String.valueOf(schemaId));
         
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE,
 String.valueOf(schemaVersion));
         
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE,
 String.valueOf(LATEST_PROTOCOL_VERSION));
 
+        if (id.getBranch().isPresent()) {
+            attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get());
+        }
+
         return attributes;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
index 74bde54..077016a 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java
@@ -19,6 +19,7 @@ package org.apache.nifi.schema.access;
 
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.stream.io.StreamUtils;
 
 import java.io.IOException;
@@ -66,7 +67,8 @@ public class HortonworksEncodedSchemaReferenceStrategy 
implements SchemaAccessSt
         final long schemaId = bb.getLong();
         final int schemaVersion = bb.getInt();
 
-        return schemaRegistry.retrieveSchema(schemaId, schemaVersion);
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build();
+        return schemaRegistry.retrieveSchema(schemaIdentifier);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
index bf6a9ea..cb4ed4e 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java
@@ -17,6 +17,9 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -27,9 +30,6 @@ import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
 
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-
 public class HortonworksEncodedSchemaReferenceWriter implements 
SchemaAccessWriter {
     private static final Set<SchemaField> requiredSchemaFields = 
EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
     private static final int LATEST_PROTOCOL_VERSION = 1;
@@ -37,8 +37,8 @@ public class HortonworksEncodedSchemaReferenceWriter 
implements SchemaAccessWrit
     @Override
     public void writeHeader(final RecordSchema schema, final OutputStream out) 
throws IOException {
         final SchemaIdentifier identifier = schema.getIdentifier();
-        final long id = identifier.getIdentifier().getAsLong();
-        final int version = identifier.getVersion().getAsInt();
+        final Long id = identifier.getIdentifier().getAsLong();
+        final Integer version = identifier.getVersion().getAsInt();
 
         // This decoding follows the pattern that is provided for serializing 
data by the Hortonworks Schema Registry serializer
         // as it is provided at:

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
index 54a248d..4842df3 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java
@@ -19,10 +19,11 @@ package org.apache.nifi.schema.access;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.Collections;
 import java.util.EnumSet;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.Set;
 
 import org.apache.nifi.serialization.record.RecordSchema;
@@ -30,7 +31,9 @@ import org.apache.nifi.serialization.record.SchemaIdentifier;
 
 public class SchemaNameAsAttribute implements SchemaAccessWriter {
     private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_NAME);
-    private static final String SCHEMA_NAME_ATTRIBUTE = "schema.name";
+    static final String SCHEMA_NAME_ATTRIBUTE = "schema.name";
+    static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch";
+    static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version";
 
     @Override
     public void writeHeader(final RecordSchema schema, final OutputStream out) 
throws IOException {
@@ -38,17 +41,34 @@ public class SchemaNameAsAttribute implements 
SchemaAccessWriter {
 
     @Override
     public Map<String, String> getAttributes(final RecordSchema schema) {
+        final Map<String,String> attributes = new HashMap<>(3);
+
         final SchemaIdentifier identifier = schema.getIdentifier();
+
         final Optional<String> nameOption = identifier.getName();
         if (nameOption.isPresent()) {
-            return Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, 
nameOption.get());
+            attributes.put(SCHEMA_NAME_ATTRIBUTE, nameOption.get());
+        }
+
+        final OptionalInt versionOption = identifier.getVersion();
+        if (versionOption.isPresent()) {
+            attributes.put(SCHEMA_VERSION_ATTRIBUTE, 
String.valueOf(versionOption.getAsInt()));
         }
-        return Collections.emptyMap();
+
+        final Optional<String> branchOption = identifier.getBranch();
+        if (branchOption.isPresent()) {
+            attributes.put(SCHEMA_BRANCH_ATTRIBUTE, branchOption.get());
+        }
+
+        return attributes;
     }
 
     @Override
     public void validateSchema(final RecordSchema schema) throws 
SchemaNotFoundException {
         final SchemaIdentifier schemaId = schema.getIdentifier();
+        if (schemaId == null) {
+            throw new SchemaNotFoundException("Cannot write Schema Name As 
Attribute because Schema Identifier is not known");
+        }
         if (!schemaId.getName().isPresent()) {
             throw new SchemaNotFoundException("Cannot write Schema Name As 
Attribute because the Schema Name is not known");
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
index 07ba3f3..257fb58 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java
@@ -17,9 +17,11 @@
 
 package org.apache.nifi.schema.access;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 
 import java.io.InputStream;
 import java.util.Collections;
@@ -32,10 +34,17 @@ public class SchemaNamePropertyStrategy implements 
SchemaAccessStrategy {
 
     private final SchemaRegistry schemaRegistry;
     private final PropertyValue schemaNamePropertyValue;
+    private final PropertyValue schemaBranchNamePropertyValue;
+    private final PropertyValue schemaVersionPropertyValue;
 
-    public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry, 
final PropertyValue schemaNamePropertyValue) {
+    public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry,
+                                      final PropertyValue 
schemaNamePropertyValue,
+                                      final PropertyValue 
schemaBranchNamePropertyValue,
+                                      final PropertyValue 
schemaVersionPropertyValue) {
         this.schemaRegistry = schemaRegistry;
         this.schemaNamePropertyValue = schemaNamePropertyValue;
+        this.schemaBranchNamePropertyValue = schemaBranchNamePropertyValue;
+        this.schemaVersionPropertyValue = schemaVersionPropertyValue;
 
         schemaFields = new HashSet<>();
         schemaFields.add(SchemaField.SCHEMA_NAME);
@@ -50,12 +59,33 @@ public class SchemaNamePropertyStrategy implements 
SchemaAccessStrategy {
         }
 
         try {
-            final RecordSchema recordSchema = 
schemaRegistry.retrieveSchema(schemaName);
+            final String schemaBranchName = 
schemaBranchNamePropertyValue.evaluateAttributeExpressions(variables).getValue();
+            final String schemaVersion = 
schemaVersionPropertyValue.evaluateAttributeExpressions(variables).getValue();
+
+            final SchemaIdentifier.Builder identifierBuilder = 
SchemaIdentifier.builder();
+            identifierBuilder.name(schemaName);
+
+            if (!StringUtils.isBlank(schemaBranchName)) {
+                identifierBuilder.branch(schemaBranchName);
+            }
+
+            if (!StringUtils.isBlank(schemaVersion)) {
+                try {
+                    identifierBuilder.version(Integer.valueOf(schemaVersion));
+                } catch (NumberFormatException nfe) {
+                    throw new SchemaNotFoundException("Could not retrieve 
schema with name '" + schemaName
+                            + "' because a non-numeric version was supplied '" 
+ schemaVersion + "'", nfe);
+                }
+            }
+
+            final RecordSchema recordSchema = 
schemaRegistry.retrieveSchema(identifierBuilder.build());
             if (recordSchema == null) {
                 throw new SchemaNotFoundException("Could not find a schema 
with name '" + schemaName + "' in the configured Schema Registry");
             }
 
             return recordSchema;
+        } catch (final SchemaNotFoundException snf) {
+            throw snf;
         } catch (final Exception e) {
             throw new SchemaNotFoundException("Could not retrieve schema with 
name '" + schemaName + "' from the configured Schema Registry", e);
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
new file mode 100644
index 0000000..f613ab8
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.Before;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AbstractSchemaAccessStrategyTest {
+
+    protected SchemaRegistry schemaRegistry;
+    protected RecordSchema recordSchema;
+
+    @Before
+    public void setup() {
+        this.schemaRegistry = Mockito.mock(SchemaRegistry.class);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("firstName", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("lastName", 
RecordFieldType.STRING.getDataType()));
+
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
+                .name("person").branch("master").version(1).id(1L).build();
+
+        this.recordSchema = new SimpleRecordSchema(fields, schemaIdentifier);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/SchemaIdentifierMatcher.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/SchemaIdentifierMatcher.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/SchemaIdentifierMatcher.java
new file mode 100644
index 0000000..a99f3f3
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/SchemaIdentifierMatcher.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.mockito.ArgumentMatcher;
+
+/**
+ * ArgumentMatcher for SchemaIdentifier.
+ */
+public class SchemaIdentifierMatcher extends ArgumentMatcher<SchemaIdentifier> 
{
+
+    private final SchemaIdentifier expectedIdentifier;
+
+    public SchemaIdentifierMatcher(final SchemaIdentifier expectedIdentifier) {
+        this.expectedIdentifier = expectedIdentifier;
+    }
+
+    @Override
+    public boolean matches(final Object argument) {
+        if (argument == null || !(argument instanceof SchemaIdentifier)) {
+            return false;
+        }
+
+        final SchemaIdentifier other = (SchemaIdentifier) argument;
+        return other.equals(expectedIdentifier);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryStrategy.java
new file mode 100644
index 0000000..6bd585e
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryStrategy.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.when;
+
+public class TestConfluentSchemaRegistryStrategy extends 
AbstractSchemaAccessStrategyTest {
+
+    @Test
+    public void testGetSchemaWithValidEncoding() throws IOException, 
SchemaNotFoundException {
+        final SchemaAccessStrategy schemaAccessStrategy = new 
ConfluentSchemaRegistryStrategy(schemaRegistry);
+
+        final int schemaId = 123456;
+
+        try (final ByteArrayOutputStream bytesOut = new 
ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(bytesOut)) {
+            out.write(0);
+            out.writeInt(schemaId);
+            out.flush();
+
+            try (final ByteArrayInputStream in = new 
ByteArrayInputStream(bytesOut.toByteArray())) {
+
+                // the confluent strategy will read the id from the input 
stream and use '1' as the version
+                final SchemaIdentifier expectedSchemaIdentifier = 
SchemaIdentifier.builder()
+                        .id((long)schemaId)
+                        .version(1)
+                        .build();
+
+                when(schemaRegistry.retrieveSchema(argThat(new 
SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                        .thenReturn(recordSchema);
+
+                final RecordSchema retrievedSchema = 
schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema);
+                assertNotNull(retrievedSchema);
+            }
+        }
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testGetSchemaWithInvalidEncoding() throws IOException, 
SchemaNotFoundException {
+        final SchemaAccessStrategy schemaAccessStrategy = new 
ConfluentSchemaRegistryStrategy(schemaRegistry);
+
+        final int schemaId = 123456;
+
+        try (final ByteArrayOutputStream bytesOut = new 
ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(bytesOut)) {
+            out.write(1); // write an invalid magic byte
+            out.writeInt(schemaId);
+            out.flush();
+
+            try (final ByteArrayInputStream in = new 
ByteArrayInputStream(bytesOut.toByteArray())) {
+                schemaAccessStrategy.getSchema(Collections.emptyMap(), in, 
recordSchema);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryWriter.java
new file mode 100644
index 0000000..8ee8280
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryWriter.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class TestConfluentSchemaRegistryWriter {
+
+    @Test
+    public void testValidateValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().id(123456L).version(2).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new 
ConfluentSchemaRegistryWriter();
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateInvalidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new 
ConfluentSchemaRegistryWriter();
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testWriteHeader() throws IOException {
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().id(123456L).version(2).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+        final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+        final SchemaAccessWriter schemaAccessWriter = new 
ConfluentSchemaRegistryWriter();
+        schemaAccessWriter.writeHeader(recordSchema, out);
+
+        try (final ByteArrayInputStream bytesIn = new 
ByteArrayInputStream(out.toByteArray());
+             final DataInputStream in = new DataInputStream(bytesIn)) {
+            Assert.assertEquals(0, in.readByte());
+            Assert.assertEquals((int) 
schemaIdentifier.getIdentifier().getAsLong(), in.readInt());
+        }
+    }
+
+    private RecordSchema createRecordSchema(final SchemaIdentifier 
schemaIdentifier) {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("firstName", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("lastName", 
RecordFieldType.STRING.getDataType()));
+        return new SimpleRecordSchema(fields, schemaIdentifier);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
new file mode 100644
index 0000000..851058f
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.when;
+
+public class TestHortonworksAttributeSchemaReferenceStrategy extends 
AbstractSchemaAccessStrategyTest {
+
+    @Test
+    public void testGetSchemaWithValidAttributes() throws IOException, 
SchemaNotFoundException {
+        final long schemaId = 123456;
+        final int version = 2;
+        final int protocol = 1;
+
+        final Map<String,String> attributes = new HashMap<>();
+        
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, 
String.valueOf(schemaId));
+        
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE,
 String.valueOf(version));
+        
attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE,
 String.valueOf(protocol));
+
+        final SchemaAccessStrategy schemaAccessStrategy = new 
HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+
+        final SchemaIdentifier expectedSchemaIdentifier = 
SchemaIdentifier.builder()
+                .id(schemaId)
+                .version(version)
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new 
SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = 
schemaAccessStrategy.getSchema(attributes, null, recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testGetSchemaMissingAttributes() throws IOException, 
SchemaNotFoundException {
+        final SchemaAccessStrategy schemaAccessStrategy = new 
HortonworksAttributeSchemaReferenceStrategy(schemaRegistry);
+        schemaAccessStrategy.getSchema(Collections.emptyMap(), null, 
recordSchema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
new file mode 100644
index 0000000..ea3be57
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TestHortonworksAttributeSchemaReferenceWriter {
+
+    @Test
+    public void testValidateWithValidSchema() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().id(123456L).version(2).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new 
HortonworksAttributeSchemaReferenceWriter();
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateWithInvalidSchema() throws SchemaNotFoundException 
{
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().name("test").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new 
HortonworksAttributeSchemaReferenceWriter();
+        schemaAccessWriter.validateSchema(recordSchema);
+    }
+
+    @Test
+    public void testGetAttributesWithoutBranch() {
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().id(123456L).version(2).build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new 
HortonworksAttributeSchemaReferenceWriter();
+        final Map<String,String> attributes = 
schemaAccessWriter.getAttributes(recordSchema);
+
+        Assert.assertEquals(3, attributes.size());
+
+        
Assert.assertEquals(String.valueOf(schemaIdentifier.getIdentifier().getAsLong()),
+                
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE));
+
+        
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
+                
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
+
+        
Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
+                
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
+    }
+
+    @Test
+    public void testGetAttributesWithBranch() {
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().id(123456L).version(2).branch("foo").build();
+        final RecordSchema recordSchema = createRecordSchema(schemaIdentifier);
+
+        final SchemaAccessWriter schemaAccessWriter = new 
HortonworksAttributeSchemaReferenceWriter();
+        final Map<String,String> attributes = 
schemaAccessWriter.getAttributes(recordSchema);
+
+        Assert.assertEquals(4, attributes.size());
+
+        
Assert.assertEquals(String.valueOf(schemaIdentifier.getIdentifier().getAsLong()),
+                
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE));
+
+        
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()),
+                
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE));
+
+        
Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION),
+                
attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE));
+
+        Assert.assertEquals("foo", 
attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE));
+    }
+
+    private RecordSchema createRecordSchema(final SchemaIdentifier 
schemaIdentifier) {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("firstName", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("lastName", 
RecordFieldType.STRING.getDataType()));
+        return new SimpleRecordSchema(fields, schemaIdentifier);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceStrategy.java
new file mode 100644
index 0000000..6352eba
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceStrategy.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.when;
+
+public class TestHortonworksEncodedSchemaReferenceStrategy extends 
AbstractSchemaAccessStrategyTest {
+
+    @Test
+    public void testGetSchemaWithValidEncoding() throws IOException, 
SchemaNotFoundException {
+        final SchemaAccessStrategy schemaAccessStrategy = new 
HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
+
+        final int protocol = 1;
+        final long schemaId = 123456;
+        final int version = 2;
+
+        try (final ByteArrayOutputStream bytesOut = new 
ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(bytesOut)) {
+            out.write(protocol);
+            out.writeLong(schemaId);
+            out.writeInt(version);
+            out.flush();
+
+            try (final ByteArrayInputStream in = new 
ByteArrayInputStream(bytesOut.toByteArray())) {
+
+                // the confluent strategy will read the id from the input 
stream and use '1' as the version
+                final SchemaIdentifier expectedSchemaIdentifier = 
SchemaIdentifier.builder()
+                        .id(schemaId)
+                        .version(version)
+                        .build();
+
+                when(schemaRegistry.retrieveSchema(argThat(new 
SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                        .thenReturn(recordSchema);
+
+                final RecordSchema retrievedSchema = 
schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema);
+                assertNotNull(retrievedSchema);
+            }
+        }
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testGetSchemaWithInvalidProtocol() throws IOException, 
SchemaNotFoundException {
+        final SchemaAccessStrategy schemaAccessStrategy = new 
HortonworksEncodedSchemaReferenceStrategy(schemaRegistry);
+
+        final int protocol = 0; // use an invalid protocol
+        final long schemaId = 123456;
+        final int version = 2;
+
+        try (final ByteArrayOutputStream bytesOut = new 
ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(bytesOut)) {
+            out.write(protocol);
+            out.writeLong(schemaId);
+            out.writeInt(version);
+            out.flush();
+
+            try (final ByteArrayInputStream in = new 
ByteArrayInputStream(bytesOut.toByteArray())) {
+                schemaAccessStrategy.getSchema(Collections.emptyMap(), in, 
recordSchema);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
index 5d7f4d7..b0589ea 100644
--- 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java
@@ -17,7 +17,10 @@
 
 package org.apache.nifi.schema.access;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -25,10 +28,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.Collections;
 
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 public class TestHortonworksEncodedSchemaReferenceWriter {
 
@@ -36,7 +36,7 @@ public class TestHortonworksEncodedSchemaReferenceWriter {
     public void testHeader() throws IOException {
         final HortonworksEncodedSchemaReferenceWriter writer = new 
HortonworksEncodedSchemaReferenceWriter();
 
-        final RecordSchema schema = new 
SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.of("name", 48L, 
2));
+        final RecordSchema schema = new 
SimpleRecordSchema(Collections.emptyList(), 
SchemaIdentifier.builder().name("name").id( 48L).version( 2).build());
 
         final byte[] header;
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNameAsAttribute.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNameAsAttribute.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNameAsAttribute.java
new file mode 100644
index 0000000..be437a5
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNameAsAttribute.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class TestSchemaNameAsAttribute {
+
+    private List<RecordField> fields;
+    private SchemaAccessWriter schemaAccessWriter;
+
+    @Before
+    public void setup() {
+        fields = new ArrayList<>();
+        fields.add(new RecordField("firstName", 
RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("lastName", 
RecordFieldType.STRING.getDataType()));
+
+        schemaAccessWriter = new SchemaNameAsAttribute();
+    }
+
+    @Test
+    public void testWriteNameBranchAndVersion() {
+        final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder()
+                .name("person").branch("master").version(1).id(1L).build();
+
+        final RecordSchema schema = new SimpleRecordSchema(fields, 
schemaIdentifier);
+
+        final Map<String,String> attributes = 
schemaAccessWriter.getAttributes(schema);
+        Assert.assertEquals(3, attributes.size());
+        Assert.assertEquals(schemaIdentifier.getName().get(), 
attributes.get(SchemaNameAsAttribute.SCHEMA_NAME_ATTRIBUTE));
+        Assert.assertEquals(schemaIdentifier.getBranch().get(), 
attributes.get(SchemaNameAsAttribute.SCHEMA_BRANCH_ATTRIBUTE));
+        
Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()), 
attributes.get(SchemaNameAsAttribute.SCHEMA_VERSION_ATTRIBUTE));
+    }
+
+    @Test
+    public void testWriteOnlyName() {
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().name("person").id(1L).build();
+
+        final RecordSchema schema = new SimpleRecordSchema(fields, 
schemaIdentifier);
+
+        final Map<String,String> attributes = 
schemaAccessWriter.getAttributes(schema);
+        Assert.assertEquals(1, attributes.size());
+        Assert.assertEquals(schemaIdentifier.getName().get(), 
attributes.get(SchemaNameAsAttribute.SCHEMA_NAME_ATTRIBUTE));
+    }
+
+    @Test
+    public void testValidateSchemaWhenValid() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().name("person").id(1L).build();
+        final RecordSchema schema = new SimpleRecordSchema(fields, 
schemaIdentifier);
+        schemaAccessWriter.validateSchema(schema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateSchemaWhenNoIdentifier() throws 
SchemaNotFoundException {
+        final RecordSchema schema = new SimpleRecordSchema(fields, null);
+        schemaAccessWriter.validateSchema(schema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testValidateSchemaWhenNoName() throws SchemaNotFoundException {
+        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().id(1L).build();
+        final RecordSchema schema = new SimpleRecordSchema(fields, 
schemaIdentifier);
+        schemaAccessWriter.validateSchema(schema);
+    }
+}

Reply via email to