http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNamePropertyStrategy.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNamePropertyStrategy.java
 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNamePropertyStrategy.java
new file mode 100644
index 0000000..bfbc1e0
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNamePropertyStrategy.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schema.access;
+
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.util.MockPropertyValue;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.when;
+
+public class TestSchemaNamePropertyStrategy extends 
AbstractSchemaAccessStrategyTest {
+
+    @Test
+    public void testNameOnly() throws SchemaNotFoundException, IOException {
+        final PropertyValue nameValue = new MockPropertyValue("person");
+        final PropertyValue branchValue = new MockPropertyValue(null);
+        final PropertyValue versionValue = new MockPropertyValue(null);
+
+        final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new 
SchemaNamePropertyStrategy(
+                schemaRegistry, nameValue, branchValue, versionValue);
+
+        final SchemaIdentifier expectedSchemaIdentifier = 
SchemaIdentifier.builder()
+                .name(nameValue.getValue())
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new 
SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = 
schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, 
recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
+    @Test
+    public void testNameAndVersion() throws SchemaNotFoundException, 
IOException {
+        final PropertyValue nameValue = new MockPropertyValue("person");
+        final PropertyValue branchValue = new MockPropertyValue(null);
+        final PropertyValue versionValue = new MockPropertyValue("1");
+
+        final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new 
SchemaNamePropertyStrategy(
+                schemaRegistry, nameValue, branchValue, versionValue);
+
+        final SchemaIdentifier expectedSchemaIdentifier = 
SchemaIdentifier.builder()
+                .name(nameValue.getValue())
+                .version(versionValue.asInteger())
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new 
SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = 
schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, 
recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
+    @Test
+    public void testNameAndBlankVersion() throws SchemaNotFoundException, 
IOException {
+        final PropertyValue nameValue = new MockPropertyValue("person");
+        final PropertyValue branchValue = new MockPropertyValue(null);
+        final PropertyValue versionValue = new MockPropertyValue("   ");
+
+        final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new 
SchemaNamePropertyStrategy(
+                schemaRegistry, nameValue, branchValue, versionValue);
+
+        final SchemaIdentifier expectedSchemaIdentifier = 
SchemaIdentifier.builder()
+                .name(nameValue.getValue())
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new 
SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = 
schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, 
recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
+    @Test(expected = SchemaNotFoundException.class)
+    public void testNameAndNonNumericVersion() throws SchemaNotFoundException, 
IOException {
+        final PropertyValue nameValue = new MockPropertyValue("person");
+        final PropertyValue branchValue = new MockPropertyValue(null);
+        final PropertyValue versionValue = new MockPropertyValue("XYZ");
+
+        final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new 
SchemaNamePropertyStrategy(
+                schemaRegistry, nameValue, branchValue, versionValue);
+
+        schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, 
recordSchema);
+    }
+
+    @Test
+    public void testNameAndBranch() throws SchemaNotFoundException, 
IOException {
+        final PropertyValue nameValue = new MockPropertyValue("person");
+        final PropertyValue branchValue = new MockPropertyValue("test");
+        final PropertyValue versionValue = new MockPropertyValue(null);
+
+        final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new 
SchemaNamePropertyStrategy(
+                schemaRegistry, nameValue, branchValue, versionValue);
+
+        final SchemaIdentifier expectedSchemaIdentifier = 
SchemaIdentifier.builder()
+                .name(nameValue.getValue())
+                .branch(branchValue.getValue())
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new 
SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = 
schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, 
recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
+    @Test
+    public void testNameAndBlankBranch() throws SchemaNotFoundException, 
IOException {
+        final PropertyValue nameValue = new MockPropertyValue("person");
+        final PropertyValue branchValue = new MockPropertyValue("  ");
+        final PropertyValue versionValue = new MockPropertyValue(null);
+
+        final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new 
SchemaNamePropertyStrategy(
+                schemaRegistry, nameValue, branchValue, versionValue);
+
+        final SchemaIdentifier expectedSchemaIdentifier = 
SchemaIdentifier.builder()
+                .name(nameValue.getValue())
+                .build();
+
+        when(schemaRegistry.retrieveSchema(argThat(new 
SchemaIdentifierMatcher(expectedSchemaIdentifier))))
+                .thenReturn(recordSchema);
+
+        final RecordSchema retrievedSchema = 
schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, 
recordSchema);
+        assertNotNull(retrievedSchema);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
index 785d729..1b7ee8b 100644
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java
@@ -16,31 +16,14 @@
  */
 package org.apache.nifi.schemaregistry.services;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.stream.Collectors;
-
 import org.apache.avro.Schema;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.reporting.InitializationException;
@@ -49,13 +32,24 @@ import 
org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
 @Tags({"schema", "registry", "avro", "json", "csv"})
 @CapabilityDescription("Provides a service for registering and accessing 
schemas. You can register a schema "
     + "as a dynamic property where 'name' represents the schema name and 
'value' represents the textual "
     + "representation of the actual schema following the syntax and semantics 
of Avro's Schema format.")
 public class AvroSchemaRegistry extends AbstractControllerService implements 
SchemaRegistry {
     private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, 
SchemaField.SCHEMA_TEXT_FORMAT);
-    private final Map<String, String> schemaNameToSchemaMap;
     private final ConcurrentMap<String, RecordSchema> recordSchemas = new 
ConcurrentHashMap<>();
 
     static final PropertyDescriptor VALIDATE_FIELD_NAMES = new 
PropertyDescriptor.Builder()
@@ -70,9 +64,6 @@ public class AvroSchemaRegistry extends 
AbstractControllerService implements Sch
 
     private List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
 
-    public AvroSchemaRegistry() {
-        this.schemaNameToSchemaMap = new HashMap<>();
-    }
 
     @Override
     protected void init(ControllerServiceInitializationContext config) throws 
InitializationException {
@@ -92,7 +83,7 @@ public class AvroSchemaRegistry extends 
AbstractControllerService implements Sch
                 try {
                     // Use a non-strict parser here, a strict parse can be 
done (if specified) in customValidate().
                     final Schema avroSchema = new 
Schema.Parser().setValidate(false).parse(newValue);
-                    final SchemaIdentifier schemaId = 
SchemaIdentifier.ofName(descriptor.getName());
+                    final SchemaIdentifier schemaId = 
SchemaIdentifier.builder().name(descriptor.getName()).build();
                     final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema, newValue, schemaId);
                     recordSchemas.put(descriptor.getName(), recordSchema);
                 } catch (final Exception e) {
@@ -127,18 +118,7 @@ public class AvroSchemaRegistry extends 
AbstractControllerService implements Sch
         return results;
     }
 
-    @Override
-    public String retrieveSchemaText(final String schemaName) throws 
SchemaNotFoundException {
-        final String schemaText = schemaNameToSchemaMap.get(schemaName);
-        if (schemaText == null) {
-            throw new SchemaNotFoundException("Unable to find schema with name 
'" + schemaName + "'");
-        }
-
-        return schemaText;
-    }
-
-    @Override
-    public RecordSchema retrieveSchema(final String schemaName) throws 
SchemaNotFoundException {
+    private RecordSchema retrieveSchemaByName(final String schemaName) throws 
SchemaNotFoundException {
         final RecordSchema recordSchema = recordSchemas.get(schemaName);
         if (recordSchema == null) {
             throw new SchemaNotFoundException("Unable to find schema with name 
'" + schemaName + "'");
@@ -147,26 +127,13 @@ public class AvroSchemaRegistry extends 
AbstractControllerService implements Sch
     }
 
     @Override
-    public RecordSchema retrieveSchema(long schemaId, int version) throws 
IOException, SchemaNotFoundException {
-        throw new SchemaNotFoundException("This Schema Registry does not 
support schema lookup by identifier and version - only by name.");
-    }
-
-    @Override
-    public String retrieveSchemaText(long schemaId, int version) throws 
IOException, SchemaNotFoundException {
-        throw new SchemaNotFoundException("This Schema Registry does not 
support schema lookup by identifier and version - only by name.");
-    }
-
-    @OnDisabled
-    public void close() throws Exception {
-        schemaNameToSchemaMap.clear();
-    }
-
-
-    @OnEnabled
-    public void enable(final ConfigurationContext configurationContext) throws 
InitializationException {
-        
this.schemaNameToSchemaMap.putAll(configurationContext.getProperties().entrySet().stream()
-            .filter(propEntry -> propEntry.getKey().isDynamic())
-            .collect(Collectors.toMap(propEntry -> 
propEntry.getKey().getName(), propEntry -> propEntry.getValue())));
+    public RecordSchema retrieveSchema(final SchemaIdentifier 
schemaIdentifier) throws IOException, SchemaNotFoundException {
+        final Optional<String> schemaName = schemaIdentifier.getName();
+        if (schemaName.isPresent()) {
+            return retrieveSchemaByName(schemaName.get());
+        } else {
+            throw new SchemaNotFoundException("This Schema Registry only 
supports retrieving a schema by name.");
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
index 67a0959..0d72d81 100644
--- 
a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java
@@ -31,6 +31,8 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -39,8 +41,7 @@ public class TestAvroSchemaRegistry {
     @Test
     public void validateSchemaRegistrationFromrDynamicProperties() throws 
Exception {
         String schemaName = "fooSchema";
-        ConfigurationContext configContext = mock(ConfigurationContext.class);
-        Map<PropertyDescriptor, String> properties = new HashMap<>();
+
         PropertyDescriptor fooSchema = new PropertyDescriptor.Builder()
             .name(schemaName)
             .dynamic(true)
@@ -54,20 +55,20 @@ public class TestAvroSchemaRegistry {
             .name("barSchema")
             .dynamic(false)
             .build();
-        properties.put(fooSchema, fooSchemaText);
-        properties.put(barSchema, "");
-        when(configContext.getProperties()).thenReturn(properties);
+
         AvroSchemaRegistry delegate = new AvroSchemaRegistry();
-        delegate.enable(configContext);
-        String locatedSchemaText = delegate.retrieveSchemaText(schemaName);
-        assertEquals(fooSchemaText, locatedSchemaText);
+        delegate.onPropertyModified(fooSchema, null, fooSchemaText);
+        delegate.onPropertyModified(barSchema, null, "");
+
+        SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().name(schemaName).build();
+        RecordSchema locatedSchema = delegate.retrieveSchema(schemaIdentifier);
+        assertEquals(fooSchemaText, locatedSchema.getSchemaText().get());
         try {
-            delegate.retrieveSchemaText("barSchema");
+            
delegate.retrieveSchema(SchemaIdentifier.builder().name("barSchema").build());
             Assert.fail("Expected a SchemaNotFoundException to be thrown but 
it was not");
         } catch (final SchemaNotFoundException expected) {
         }
 
-        delegate.close();
     }
 
     @Test
@@ -109,7 +110,5 @@ public class TestAvroSchemaRegistry {
         when(propertyValue.asBoolean()).thenReturn(false);
         results = delegate.customValidate(validationContext);
         results.forEach(result -> assertTrue(result.isValid()));
-
-        delegate.close();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
index 5dbc305..06d7db7 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
@@ -18,19 +18,6 @@
 package org.apache.nifi.processors.standard;
 
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Parser;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -70,10 +57,24 @@ import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.RawRecordWriter;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.serialization.record.validation.RecordSchemaValidator;
 import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
 import org.apache.nifi.serialization.record.validation.ValidationError;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
 @EventDriven
 @SideEffectFree
 @SupportsBatching
@@ -450,7 +451,8 @@ public class ValidateRecord extends AbstractProcessor {
         } else if 
(schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
             final SchemaRegistry schemaRegistry = 
context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
             final String schemaName = 
context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
-            return schemaRegistry.retrieveSchema(schemaName);
+            final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().name(schemaName).build();
+            return schemaRegistry.retrieveSchema(schemaIdentifier);
         } else if 
(schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) {
             final String schemaText = 
context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue();
             final Parser parser = new Schema.Parser();

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
index ab40507..0c656db 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
@@ -28,7 +28,7 @@ limitations under the License.
     <artifactId>nifi-hwx-schema-registry-service</artifactId>
     <packaging>jar</packaging>
     <properties>
-        <hwx.registry.version>0.3.0</hwx.registry.version>
+        <hwx.registry.version>0.5.1</hwx.registry.version>
         <jackson.version>2.9.1</jackson.version>
     </properties>
     <dependencies>

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index f37c927..88325ad 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -16,23 +16,19 @@
  */
 package org.apache.nifi.schemaregistry.hortonworks;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.TimeUnit;
-
+import com.hortonworks.registries.schemaregistry.SchemaMetadata;
+import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
+import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
+import 
com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
 import org.apache.avro.Schema;
-import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
@@ -44,21 +40,28 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.util.Tuple;
 
-import com.hortonworks.registries.schemaregistry.SchemaMetadata;
-import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
-import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
-import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
-import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
-import 
com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
 
 @Tags({"schema", "registry", "avro", "hortonworks", "hwx"})
 @CapabilityDescription("Provides a Schema Registry Service that interacts with 
a Hortonworks Schema Registry, available at 
https://github.com/hortonworks/registry";)
 public class HortonworksSchemaRegistry extends AbstractControllerService 
implements SchemaRegistry {
-    private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
+    private static final Set<SchemaField> schemaFields = 
EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_BRANCH_NAME, 
SchemaField.SCHEMA_TEXT,
         SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, 
SchemaField.SCHEMA_VERSION);
 
     private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> 
schemaNameToSchemaMap = new ConcurrentHashMap<>();
-    private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> 
schemaVersionByNameCache = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Tuple<String,String>, Tuple<SchemaVersionInfo, 
Long>> schemaVersionByNameCache = new ConcurrentHashMap<>();
     private final ConcurrentMap<SchemaVersionKey, Tuple<SchemaVersionInfo, 
Long>> schemaVersionByKeyCache = new ConcurrentHashMap<>();
 
     private volatile long versionInfoCacheNanos;
@@ -149,10 +152,13 @@ public class HortonworksSchemaRegistry extends 
AbstractControllerService impleme
         return schemaRegistryClient;
     }
 
-    private SchemaVersionInfo getLatestSchemaVersionInfo(final 
SchemaRegistryClient client, final String schemaName) throws 
org.apache.nifi.schema.access.SchemaNotFoundException {
+
+    private SchemaVersionInfo getLatestSchemaVersionInfo(final 
SchemaRegistryClient client, final String schemaName, final String branchName)
+            throws org.apache.nifi.schema.access.SchemaNotFoundException {
         try {
             // Try to fetch the SchemaVersionInfo from the cache.
-            final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = 
schemaVersionByNameCache.get(schemaName);
+            final Tuple<String,String> nameAndBranch = new Tuple<>(schemaName, 
branchName);
+            final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = 
schemaVersionByNameCache.get(nameAndBranch);
 
             // Determine if the timestampedVersionInfo is expired
             boolean fetch = false;
@@ -169,14 +175,20 @@ public class HortonworksSchemaRegistry extends 
AbstractControllerService impleme
             }
 
             // schema version info was expired or not found in cache. Fetch 
from schema registry
-            final SchemaVersionInfo versionInfo = 
client.getLatestSchemaVersionInfo(schemaName);
+            final SchemaVersionInfo versionInfo;
+            if (StringUtils.isBlank(branchName)) {
+                versionInfo = client.getLatestSchemaVersionInfo(schemaName);
+            } else {
+                versionInfo = client.getLatestSchemaVersionInfo(branchName, 
schemaName);
+            }
+
             if (versionInfo == null) {
                 throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
             }
 
             // Store new version in cache.
             final Tuple<SchemaVersionInfo, Long> tuple = new 
Tuple<>(versionInfo, System.nanoTime());
-            schemaVersionByNameCache.put(schemaName, tuple);
+            schemaVersionByNameCache.put(nameAndBranch, tuple);
             return versionInfo;
         } catch (final SchemaNotFoundException e) {
             throw new org.apache.nifi.schema.access.SchemaNotFoundException(e);
@@ -217,23 +229,23 @@ public class HortonworksSchemaRegistry extends 
AbstractControllerService impleme
         }
     }
 
-    @Override
-    public String retrieveSchemaText(final String schemaName) throws 
org.apache.nifi.schema.access.SchemaNotFoundException {
-        final SchemaVersionInfo latest = 
getLatestSchemaVersionInfo(getClient(), schemaName);
-        return latest.getSchemaText();
-    }
-
+    private RecordSchema retrieveSchemaByName(final SchemaIdentifier 
schemaIdentifier) throws org.apache.nifi.schema.access.SchemaNotFoundException, 
IOException {
 
-    @Override
-    public RecordSchema retrieveSchema(final String schemaName) throws 
org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
         final SchemaRegistryClient client = getClient();
 
         final SchemaVersionInfo versionInfo;
         final Long schemaId;
-        final Integer version;
+
+        final Optional<String> schemaName = schemaIdentifier.getName();
+        if (!schemaName.isPresent()) {
+            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema 
because Schema Name is not present");
+        }
+
+        final Optional<String> schemaBranchName = schemaIdentifier.getBranch();
+        final OptionalInt schemaVersion = schemaIdentifier.getVersion();
 
         try {
-            final SchemaMetadataInfo metadataInfo = 
client.getSchemaMetadataInfo(schemaName);
+            final SchemaMetadataInfo metadataInfo = 
client.getSchemaMetadataInfo(schemaName.get());
             if (metadataInfo == null) {
                 throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
             }
@@ -243,61 +255,59 @@ public class HortonworksSchemaRegistry extends 
AbstractControllerService impleme
                 throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
             }
 
-            versionInfo = getLatestSchemaVersionInfo(client, schemaName);
-            version = versionInfo.getVersion();
-            if (version == null) {
-                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with name '" + schemaName + "'");
+            // possible scenarios are name only, name + branch, or name + 
version
+            if (schemaVersion.isPresent()) {
+                final SchemaVersionKey schemaVersionKey = new 
SchemaVersionKey(schemaName.get(), schemaVersion.getAsInt());
+                versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
+            } else {
+                versionInfo = getLatestSchemaVersionInfo(client, 
schemaName.get(), schemaBranchName.orElse(null));
+            }
+
+            if (versionInfo == null || versionInfo.getVersion() == null) {
+                final String message = createErrorMessage("Could not find 
schema", schemaName, schemaBranchName, schemaVersion);
+                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException(message);
             }
+
         } catch (final Exception e) {
-            handleException("Failed to retrieve schema with name '" + 
schemaName + "'", e);
+            final String message = createErrorMessage("Failed to retrieve 
schema", schemaName, schemaBranchName, schemaVersion);
+            handleException(message, e);
             return null;
         }
 
         final String schemaText = versionInfo.getSchemaText();
-        final SchemaIdentifier schemaIdentifier = (schemaId == null || version 
== null) ? SchemaIdentifier.ofName(schemaName) : 
SchemaIdentifier.of(schemaName, schemaId, version);
 
-        final Tuple<SchemaIdentifier, String> tuple = new 
Tuple<>(schemaIdentifier, schemaText);
+        final SchemaIdentifier resultSchemaIdentifier = 
SchemaIdentifier.builder()
+                .id(schemaId)
+                .name(schemaName.get())
+                .branch(schemaBranchName.orElse(null))
+                .version(versionInfo.getVersion())
+                .build();
+
+        final Tuple<SchemaIdentifier, String> tuple = new 
Tuple<>(resultSchemaIdentifier, schemaText);
         return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
             final Schema schema = new Schema.Parser().parse(schemaText);
-            return AvroTypeUtil.createSchema(schema, schemaText, 
schemaIdentifier);
+            return AvroTypeUtil.createSchema(schema, schemaText, 
resultSchemaIdentifier);
         });
     }
 
-
-    @Override
-    public String retrieveSchemaText(final long schemaId, final int version) 
throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
+    private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier 
schemaIdentifier) throws org.apache.nifi.schema.access.SchemaNotFoundException, 
IOException {
         final SchemaRegistryClient client = getClient();
 
-        try {
-            final SchemaMetadataInfo info = 
client.getSchemaMetadataInfo(schemaId);
-            if (info == null) {
-                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
-            }
-
-            final SchemaMetadata metadata = info.getSchemaMetadata();
-            final String schemaName = metadata.getName();
-
-            final SchemaVersionKey schemaVersionKey = new 
SchemaVersionKey(schemaName, version);
-            final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, 
schemaVersionKey);
-            if (versionInfo == null) {
-                throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
-            }
+        final String schemaName;
+        final SchemaVersionInfo versionInfo;
 
-            return versionInfo.getSchemaText();
-        } catch (final Exception e) {
-            handleException("Failed to retrieve schema with ID '" + schemaId + 
"' and version '" + version + "'", e);
-            return null;
+        final OptionalLong schemaId = schemaIdentifier.getIdentifier();
+        if (!schemaId.isPresent()) {
+            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema 
because Schema Id is not present");
         }
-    }
 
-    @Override
-    public RecordSchema retrieveSchema(final long schemaId, final int version) 
throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException {
-        final SchemaRegistryClient client = getClient();
+        final OptionalInt version = schemaIdentifier.getVersion();
+        if (!version.isPresent()) {
+            throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema 
because Schema Version is not present");
+        }
 
-        final String schemaName;
-        final SchemaVersionInfo versionInfo;
         try {
-            final SchemaMetadataInfo info = 
client.getSchemaMetadataInfo(schemaId);
+            final SchemaMetadataInfo info = 
client.getSchemaMetadataInfo(schemaId.getAsLong());
             if (info == null) {
                 throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
             }
@@ -305,7 +315,7 @@ public class HortonworksSchemaRegistry extends 
AbstractControllerService impleme
             final SchemaMetadata metadata = info.getSchemaMetadata();
             schemaName = metadata.getName();
 
-            final SchemaVersionKey schemaVersionKey = new 
SchemaVersionKey(schemaName, version);
+            final SchemaVersionKey schemaVersionKey = new 
SchemaVersionKey(schemaName, version.getAsInt());
             versionInfo = getSchemaVersionInfo(client, schemaVersionKey);
             if (versionInfo == null) {
                 throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema 
with ID '" + schemaId + "' and version '" + version + "'");
@@ -317,14 +327,45 @@ public class HortonworksSchemaRegistry extends 
AbstractControllerService impleme
 
         final String schemaText = versionInfo.getSchemaText();
 
-        final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.of(schemaName, schemaId, version);
-        final Tuple<SchemaIdentifier, String> tuple = new 
Tuple<>(schemaIdentifier, schemaText);
+        final SchemaIdentifier resultSchemaIdentifier = 
SchemaIdentifier.builder()
+                .name(schemaName)
+                .id(schemaId.getAsLong())
+                .version(version.getAsInt())
+                .build();
+
+        final Tuple<SchemaIdentifier, String> tuple = new 
Tuple<>(resultSchemaIdentifier, schemaText);
         return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> {
             final Schema schema = new Schema.Parser().parse(schemaText);
-            return AvroTypeUtil.createSchema(schema, schemaText, 
schemaIdentifier);
+            return AvroTypeUtil.createSchema(schema, schemaText, 
resultSchemaIdentifier);
         });
     }
 
+    @Override
+    public RecordSchema retrieveSchema(final SchemaIdentifier 
schemaIdentifier) throws IOException, 
org.apache.nifi.schema.access.SchemaNotFoundException {
+        if (schemaIdentifier.getIdentifier().isPresent()) {
+            return retrieveSchemaByIdAndVersion(schemaIdentifier);
+        } else {
+            return retrieveSchemaByName(schemaIdentifier);
+        }
+    }
+
+    private String createErrorMessage(final String baseMessage, final 
Optional<String> schemaName, final Optional<String> branchName, final 
OptionalInt version) {
+        final StringBuilder builder = new StringBuilder(baseMessage)
+                .append(" with name '")
+                .append(schemaName.orElse("null"))
+                .append("'");
+
+        if (branchName.isPresent()) {
+            builder.append(" and branch 
'").append(branchName.get()).append("'");
+        }
+
+        if (version.isPresent()) {
+            builder.append(" and version 
'").append(version.getAsInt()).append("'");
+        }
+
+        return builder.toString();
+    }
+
     // The schema registry client wraps all IOExceptions in RuntimeException. 
So if an IOException occurs, we don't know
     // that it was an IO problem. So we will look through the Exception's 
cause chain to see if there is an IOException present.
     private void handleException(final String message, final Exception e) 
throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java
index d34c9e8..7380bb2 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java
@@ -17,20 +17,16 @@
 
 package org.apache.nifi.schemaregistry.hortonworks;
 
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-
-import java.lang.reflect.Constructor;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.util.HashMap;
-import java.util.Map;
-
+import com.hortonworks.registries.schemaregistry.SchemaCompatibility;
+import com.hortonworks.registries.schemaregistry.SchemaMetadata;
+import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
+import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
+import 
com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.util.MockConfigurationContext;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -39,12 +35,16 @@ import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.hortonworks.registries.schemaregistry.SchemaCompatibility;
-import com.hortonworks.registries.schemaregistry.SchemaMetadata;
-import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
-import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
-import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
-import 
com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
+import java.lang.reflect.Constructor;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 
 public class TestHortonworksSchemaRegistry {
     private HortonworksSchemaRegistry registry;
@@ -124,7 +124,8 @@ public class TestHortonworksSchemaRegistry {
         registry.enable(configurationContext);
 
         for (int i = 0; i < 10000; i++) {
-            final RecordSchema schema = registry.retrieveSchema("unit-test");
+            final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().name("unit-test").build();
+            final RecordSchema schema = 
registry.retrieveSchema(schemaIdentifier);
             assertNotNull(schema);
         }
 
@@ -161,7 +162,8 @@ public class TestHortonworksSchemaRegistry {
         registry.enable(configurationContext);
 
         for (int i = 0; i < 2; i++) {
-            final RecordSchema schema = registry.retrieveSchema("unit-test");
+            final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().name("unit-test").build();
+            final RecordSchema schema = 
registry.retrieveSchema(schemaIdentifier);
             assertNotNull(schema);
         }
 
@@ -170,7 +172,8 @@ public class TestHortonworksSchemaRegistry {
         Thread.sleep(2000L);
 
         for (int i = 0; i < 2; i++) {
-            final RecordSchema schema = registry.retrieveSchema("unit-test");
+            final SchemaIdentifier schemaIdentifier = 
SchemaIdentifier.builder().name("unit-test").build();
+            final RecordSchema schema = 
registry.retrieveSchema(schemaIdentifier);
             assertNotNull(schema);
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
index 502d548..b299191 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java
@@ -47,6 +47,8 @@ import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODE
 import static 
org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES;
 import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME;
+import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME;
+import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION;
 import static 
org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY;
 import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT;
@@ -79,6 +81,8 @@ public abstract class SchemaRegistryService extends 
AbstractControllerService {
 
         properties.add(SCHEMA_REGISTRY);
         properties.add(SCHEMA_NAME);
+        properties.add(SCHEMA_VERSION);
+        properties.add(SCHEMA_BRANCH_NAME);
         properties.add(SCHEMA_TEXT);
 
         return properties;

http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
 
b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
index 88362b8..e785a70 100644
--- 
a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java
@@ -16,23 +16,24 @@
  */
 package org.apache.nifi.schemaregistry.services;
 
-import java.io.IOException;
-import java.util.Set;
-
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.schema.access.SchemaField;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 
+import java.io.IOException;
+import java.util.Set;
+
 /**
  * Represents {@link ControllerService} strategy to expose internal and/or
  * integrate with external Schema Registry
  */
 public interface SchemaRegistry extends ControllerService {
 
-
     /**
+     * @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
+     *
      * Retrieves and returns the textual representation of the schema based on
      * the provided name of the schema available in Schema Registry.
      *
@@ -41,9 +42,17 @@ public interface SchemaRegistry extends ControllerService {
      * @throws IOException if unable to communicate with the backing store
      * @throws SchemaNotFoundException if unable to find the schema with the 
given name
      */
-    String retrieveSchemaText(String schemaName) throws IOException, 
SchemaNotFoundException;
+    default String retrieveSchemaText(String schemaName) throws IOException, 
SchemaNotFoundException {
+        final RecordSchema recordSchema = 
retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
+        if (recordSchema == null) {
+            throw new SchemaNotFoundException("Could not find schema with name 
'" + schemaName + "'");
+        }
+        return recordSchema.getSchemaText().get();
+    }
 
     /**
+     * @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
+     *
      * Retrieves the textual representation of the schema with the given ID 
and version
      *
      * @param schemaId the unique identifier for the desired schema
@@ -53,25 +62,37 @@ public interface SchemaRegistry extends ControllerService {
      * @throws IOException if unable to communicate with the backing store
      * @throws SchemaNotFoundException if unable to find the schema with the 
given id and version
      */
-    String retrieveSchemaText(long schemaId, int version) throws IOException, 
SchemaNotFoundException;
+    default String retrieveSchemaText(long schemaId, int version) throws 
IOException, SchemaNotFoundException {
+        final RecordSchema recordSchema = 
retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
+        if (recordSchema == null) {
+            throw new SchemaNotFoundException("Could not find schema with ID 
'" + schemaId + "' and version '" + version + "'");
+        }
+        return recordSchema.getSchemaText().get();
+    }
 
     /**
+     * @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
+     *
      * Retrieves and returns the RecordSchema based on the provided name of 
the schema available in Schema Registry. The RecordSchema
      * that is returned must have the Schema's name populated in its 
SchemaIdentifier. I.e., a call to
      * {@link RecordSchema}.{@link RecordSchema#getIdentifier() 
getIdentifier()}.{@link SchemaIdentifier#getName() getName()}
-     * will always return an {@link Optional} that is not empty.
+     * will always return an {@link java.util.Optional} that is not empty.
      *
      * @return the latest version of the schema with the given name, or 
<code>null</code> if no schema can be found with the given name.
      * @throws SchemaNotFoundException if unable to find the schema with the 
given name
      */
-    RecordSchema retrieveSchema(String schemaName) throws IOException, 
SchemaNotFoundException;
+    default RecordSchema retrieveSchema(String schemaName) throws IOException, 
SchemaNotFoundException {
+        return 
retrieveSchema(SchemaIdentifier.builder().name(schemaName).build());
+    }
 
 
     /**
+     * @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead
+     *
      * Retrieves the schema with the given ID and version. The RecordSchema 
that is returned must have the Schema's identifier and version
      * populated in its SchemaIdentifier. I.e., a call to
      * {@link RecordSchema}.{@link RecordSchema#getIdentifier() 
getIdentifier()}.{@link SchemaIdentifier#getIdentifier() getIdentifier()}
-     * will always return an {@link Optional} that is not empty, as will a 
call to
+     * will always return an {@link java.util.Optional} that is not empty, as 
will a call to
      * {@link RecordSchema}.{@link RecordSchema#getIdentifier() 
getIdentifier()}.{@link SchemaIdentifier#getVersion() getVersion()}.
      *
      * @param schemaId the unique identifier for the desired schema
@@ -82,10 +103,23 @@ public interface SchemaRegistry extends ControllerService {
      * @throws IOException if unable to communicate with the backing store
      * @throws SchemaNotFoundException if unable to find the schema with the 
given id and version
      */
-    RecordSchema retrieveSchema(long schemaId, int version) throws 
IOException, SchemaNotFoundException;
+    default RecordSchema retrieveSchema(long schemaId, int version) throws 
IOException, SchemaNotFoundException {
+        return 
retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build());
+    }
+
+    /**
+     * Retrieves the schema based on the provided descriptor. The descriptor 
must contain and schemaIdentifier or name, but not both, along
+     * with a version, and an optional branch name. For implementations that 
do not support branching, the branch name will be ignored.
+     *
+     * @param schemaIdentifier a schema schemaIdentifier
+     * @return the schema for the given descriptor
+     * @throws IOException if unable to communicate with the backing store
+     * @throws SchemaNotFoundException if unable to find the schema based on 
the given descriptor
+     */
+    RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws 
IOException, SchemaNotFoundException;
 
     /**
-     * @return the set of all Schema Fields that are supplied by the 
RecordSchema that is returned from {@link #retrieveSchema(String)} and {@link 
#retrieveSchema(long, int)}
+     * @return the set of all Schema Fields that are supplied by the 
RecordSchema that is returned from {@link #retrieveSchema(SchemaIdentifier)}
      */
     Set<SchemaField> getSuppliedSchemaFields();
 }

Reply via email to