http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNamePropertyStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNamePropertyStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNamePropertyStrategy.java new file mode 100644 index 0000000..bfbc1e0 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNamePropertyStrategy.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.util.MockPropertyValue; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.argThat; +import static org.mockito.Mockito.when; + +public class TestSchemaNamePropertyStrategy extends AbstractSchemaAccessStrategyTest { + + @Test + public void testNameOnly() throws SchemaNotFoundException, IOException { + final PropertyValue nameValue = new MockPropertyValue("person"); + final PropertyValue branchValue = new MockPropertyValue(null); + final PropertyValue versionValue = new MockPropertyValue(null); + + final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy( + schemaRegistry, nameValue, branchValue, versionValue); + + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .name(nameValue.getValue()) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema); + assertNotNull(retrievedSchema); + } + + @Test + public void testNameAndVersion() throws SchemaNotFoundException, IOException { + final PropertyValue nameValue = new MockPropertyValue("person"); + final PropertyValue branchValue = new MockPropertyValue(null); + final PropertyValue versionValue = new MockPropertyValue("1"); + + final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy( + schemaRegistry, nameValue, branchValue, versionValue); + + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .name(nameValue.getValue()) + .version(versionValue.asInteger()) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema); + assertNotNull(retrievedSchema); + } + + @Test + public void testNameAndBlankVersion() throws SchemaNotFoundException, IOException { + final PropertyValue nameValue = new MockPropertyValue("person"); + final PropertyValue branchValue = new MockPropertyValue(null); + final PropertyValue versionValue = new MockPropertyValue(" "); + + final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy( + schemaRegistry, nameValue, branchValue, versionValue); + + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .name(nameValue.getValue()) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema); + assertNotNull(retrievedSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testNameAndNonNumericVersion() throws SchemaNotFoundException, IOException { + final PropertyValue nameValue = new MockPropertyValue("person"); + final PropertyValue branchValue = new MockPropertyValue(null); + final PropertyValue versionValue = new MockPropertyValue("XYZ"); + + final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy( + schemaRegistry, nameValue, branchValue, versionValue); + + schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema); + } + + @Test + public void testNameAndBranch() throws SchemaNotFoundException, IOException { + final PropertyValue nameValue = new MockPropertyValue("person"); + final PropertyValue branchValue = new MockPropertyValue("test"); + final PropertyValue versionValue = new MockPropertyValue(null); + + final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy( + schemaRegistry, nameValue, branchValue, versionValue); + + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .name(nameValue.getValue()) + .branch(branchValue.getValue()) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema); + assertNotNull(retrievedSchema); + } + + @Test + public void testNameAndBlankBranch() throws SchemaNotFoundException, IOException { + final PropertyValue nameValue = new MockPropertyValue("person"); + final PropertyValue branchValue = new MockPropertyValue(" "); + final PropertyValue versionValue = new MockPropertyValue(null); + + final SchemaNamePropertyStrategy schemaNamePropertyStrategy = new SchemaNamePropertyStrategy( + schemaRegistry, nameValue, branchValue, versionValue); + + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .name(nameValue.getValue()) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaNamePropertyStrategy.getSchema(Collections.emptyMap(), null, recordSchema); + assertNotNull(retrievedSchema); + } + +}
http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java index 785d729..1b7ee8b 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/main/java/org/apache/nifi/schemaregistry/services/AvroSchemaRegistry.java @@ -16,31 +16,14 @@ */ package org.apache.nifi.schemaregistry.services; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.stream.Collectors; - import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; -import org.apache.nifi.annotation.lifecycle.OnDisabled; -import org.apache.nifi.annotation.lifecycle.OnEnabled; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.controller.ControllerServiceInitializationContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.reporting.InitializationException; @@ -49,13 +32,24 @@ import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + @Tags({"schema", "registry", "avro", "json", "csv"}) @CapabilityDescription("Provides a service for registering and accessing schemas. You can register a schema " + "as a dynamic property where 'name' represents the schema name and 'value' represents the textual " + "representation of the actual schema following the syntax and semantics of Avro's Schema format.") public class AvroSchemaRegistry extends AbstractControllerService implements SchemaRegistry { private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT); - private final Map<String, String> schemaNameToSchemaMap; private final ConcurrentMap<String, RecordSchema> recordSchemas = new ConcurrentHashMap<>(); static final PropertyDescriptor VALIDATE_FIELD_NAMES = new PropertyDescriptor.Builder() @@ -70,9 +64,6 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch private List<PropertyDescriptor> propertyDescriptors = new ArrayList<>(); - public AvroSchemaRegistry() { - this.schemaNameToSchemaMap = new HashMap<>(); - } @Override protected void init(ControllerServiceInitializationContext config) throws InitializationException { @@ -92,7 +83,7 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch try { // Use a non-strict parser here, a strict parse can be done (if specified) in customValidate(). final Schema avroSchema = new Schema.Parser().setValidate(false).parse(newValue); - final SchemaIdentifier schemaId = SchemaIdentifier.ofName(descriptor.getName()); + final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(descriptor.getName()).build(); final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, newValue, schemaId); recordSchemas.put(descriptor.getName(), recordSchema); } catch (final Exception e) { @@ -127,18 +118,7 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch return results; } - @Override - public String retrieveSchemaText(final String schemaName) throws SchemaNotFoundException { - final String schemaText = schemaNameToSchemaMap.get(schemaName); - if (schemaText == null) { - throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'"); - } - - return schemaText; - } - - @Override - public RecordSchema retrieveSchema(final String schemaName) throws SchemaNotFoundException { + private RecordSchema retrieveSchemaByName(final String schemaName) throws SchemaNotFoundException { final RecordSchema recordSchema = recordSchemas.get(schemaName); if (recordSchema == null) { throw new SchemaNotFoundException("Unable to find schema with name '" + schemaName + "'"); @@ -147,26 +127,13 @@ public class AvroSchemaRegistry extends AbstractControllerService implements Sch } @Override - public RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException { - throw new SchemaNotFoundException("This Schema Registry does not support schema lookup by identifier and version - only by name."); - } - - @Override - public String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException { - throw new SchemaNotFoundException("This Schema Registry does not support schema lookup by identifier and version - only by name."); - } - - @OnDisabled - public void close() throws Exception { - schemaNameToSchemaMap.clear(); - } - - - @OnEnabled - public void enable(final ConfigurationContext configurationContext) throws InitializationException { - this.schemaNameToSchemaMap.putAll(configurationContext.getProperties().entrySet().stream() - .filter(propEntry -> propEntry.getKey().isDynamic()) - .collect(Collectors.toMap(propEntry -> propEntry.getKey().getName(), propEntry -> propEntry.getValue()))); + public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + final Optional<String> schemaName = schemaIdentifier.getName(); + if (schemaName.isPresent()) { + return retrieveSchemaByName(schemaName.get()); + } else { + throw new SchemaNotFoundException("This Schema Registry only supports retrieving a schema by name."); + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java index 67a0959..0d72d81 100644 --- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/src/test/java/org/apache/nifi/schemaregistry/services/TestAvroSchemaRegistry.java @@ -31,6 +31,8 @@ import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import org.junit.Assert; import org.junit.Test; @@ -39,8 +41,7 @@ public class TestAvroSchemaRegistry { @Test public void validateSchemaRegistrationFromrDynamicProperties() throws Exception { String schemaName = "fooSchema"; - ConfigurationContext configContext = mock(ConfigurationContext.class); - Map<PropertyDescriptor, String> properties = new HashMap<>(); + PropertyDescriptor fooSchema = new PropertyDescriptor.Builder() .name(schemaName) .dynamic(true) @@ -54,20 +55,20 @@ public class TestAvroSchemaRegistry { .name("barSchema") .dynamic(false) .build(); - properties.put(fooSchema, fooSchemaText); - properties.put(barSchema, ""); - when(configContext.getProperties()).thenReturn(properties); + AvroSchemaRegistry delegate = new AvroSchemaRegistry(); - delegate.enable(configContext); - String locatedSchemaText = delegate.retrieveSchemaText(schemaName); - assertEquals(fooSchemaText, locatedSchemaText); + delegate.onPropertyModified(fooSchema, null, fooSchemaText); + delegate.onPropertyModified(barSchema, null, ""); + + SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build(); + RecordSchema locatedSchema = delegate.retrieveSchema(schemaIdentifier); + assertEquals(fooSchemaText, locatedSchema.getSchemaText().get()); try { - delegate.retrieveSchemaText("barSchema"); + delegate.retrieveSchema(SchemaIdentifier.builder().name("barSchema").build()); Assert.fail("Expected a SchemaNotFoundException to be thrown but it was not"); } catch (final SchemaNotFoundException expected) { } - delegate.close(); } @Test @@ -109,7 +110,5 @@ public class TestAvroSchemaRegistry { when(propertyValue.asBoolean()).thenReturn(false); results = delegate.customValidate(validationContext); results.forEach(result -> assertTrue(result.isValid())); - - delegate.close(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java index 5dbc305..06d7db7 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java @@ -18,19 +18,6 @@ package org.apache.nifi.processors.standard; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; - import org.apache.avro.Schema; import org.apache.avro.Schema.Parser; import org.apache.nifi.annotation.behavior.EventDriven; @@ -70,10 +57,24 @@ import org.apache.nifi.serialization.WriteResult; import org.apache.nifi.serialization.record.RawRecordWriter; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.serialization.record.validation.RecordSchemaValidator; import org.apache.nifi.serialization.record.validation.SchemaValidationResult; import org.apache.nifi.serialization.record.validation.ValidationError; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + @EventDriven @SideEffectFree @SupportsBatching @@ -450,7 +451,8 @@ public class ValidateRecord extends AbstractProcessor { } else if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) { final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class); final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue(); - return schemaRegistry.retrieveSchema(schemaName); + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name(schemaName).build(); + return schemaRegistry.retrieveSchema(schemaIdentifier); } else if (schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) { final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue(); final Parser parser = new Schema.Parser(); http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml index ab40507..0c656db 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml @@ -28,7 +28,7 @@ limitations under the License. <artifactId>nifi-hwx-schema-registry-service</artifactId> <packaging>jar</packaging> <properties> - <hwx.registry.version>0.3.0</hwx.registry.version> + <hwx.registry.version>0.5.1</hwx.registry.version> <jackson.version>2.9.1</jackson.version> </properties> <dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index f37c927..88325ad 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -16,23 +16,19 @@ */ package org.apache.nifi.schemaregistry.hortonworks; -import java.io.IOException; -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; - +import com.hortonworks.registries.schemaregistry.SchemaMetadata; +import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionKey; +import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient; +import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; import org.apache.avro.Schema; -import org.apache.nifi.avro.AvroTypeUtil; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnDisabled; import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.AbstractControllerService; import org.apache.nifi.controller.ConfigurationContext; @@ -44,21 +40,28 @@ import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.util.Tuple; -import com.hortonworks.registries.schemaregistry.SchemaMetadata; -import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; -import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; -import com.hortonworks.registries.schemaregistry.SchemaVersionKey; -import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient; -import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; @Tags({"schema", "registry", "avro", "hortonworks", "hwx"}) @CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry, available at https://github.com/hortonworks/registry") public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry { - private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT, + private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_BRANCH_NAME, SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); private final ConcurrentMap<Tuple<SchemaIdentifier, String>, RecordSchema> schemaNameToSchemaMap = new ConcurrentHashMap<>(); - private final ConcurrentMap<String, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>(); + private final ConcurrentMap<Tuple<String,String>, Tuple<SchemaVersionInfo, Long>> schemaVersionByNameCache = new ConcurrentHashMap<>(); private final ConcurrentMap<SchemaVersionKey, Tuple<SchemaVersionInfo, Long>> schemaVersionByKeyCache = new ConcurrentHashMap<>(); private volatile long versionInfoCacheNanos; @@ -149,10 +152,13 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme return schemaRegistryClient; } - private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException { + + private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName, final String branchName) + throws org.apache.nifi.schema.access.SchemaNotFoundException { try { // Try to fetch the SchemaVersionInfo from the cache. - final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByNameCache.get(schemaName); + final Tuple<String,String> nameAndBranch = new Tuple<>(schemaName, branchName); + final Tuple<SchemaVersionInfo, Long> timestampedVersionInfo = schemaVersionByNameCache.get(nameAndBranch); // Determine if the timestampedVersionInfo is expired boolean fetch = false; @@ -169,14 +175,20 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme } // schema version info was expired or not found in cache. Fetch from schema registry - final SchemaVersionInfo versionInfo = client.getLatestSchemaVersionInfo(schemaName); + final SchemaVersionInfo versionInfo; + if (StringUtils.isBlank(branchName)) { + versionInfo = client.getLatestSchemaVersionInfo(schemaName); + } else { + versionInfo = client.getLatestSchemaVersionInfo(branchName, schemaName); + } + if (versionInfo == null) { throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); } // Store new version in cache. final Tuple<SchemaVersionInfo, Long> tuple = new Tuple<>(versionInfo, System.nanoTime()); - schemaVersionByNameCache.put(schemaName, tuple); + schemaVersionByNameCache.put(nameAndBranch, tuple); return versionInfo; } catch (final SchemaNotFoundException e) { throw new org.apache.nifi.schema.access.SchemaNotFoundException(e); @@ -217,23 +229,23 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme } } - @Override - public String retrieveSchemaText(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException { - final SchemaVersionInfo latest = getLatestSchemaVersionInfo(getClient(), schemaName); - return latest.getSchemaText(); - } - + private RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException { - @Override - public RecordSchema retrieveSchema(final String schemaName) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException { final SchemaRegistryClient client = getClient(); final SchemaVersionInfo versionInfo; final Long schemaId; - final Integer version; + + final Optional<String> schemaName = schemaIdentifier.getName(); + if (!schemaName.isPresent()) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present"); + } + + final Optional<String> schemaBranchName = schemaIdentifier.getBranch(); + final OptionalInt schemaVersion = schemaIdentifier.getVersion(); try { - final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName); + final SchemaMetadataInfo metadataInfo = client.getSchemaMetadataInfo(schemaName.get()); if (metadataInfo == null) { throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); } @@ -243,61 +255,59 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); } - versionInfo = getLatestSchemaVersionInfo(client, schemaName); - version = versionInfo.getVersion(); - if (version == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + // possible scenarios are name only, name + branch, or name + version + if (schemaVersion.isPresent()) { + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName.get(), schemaVersion.getAsInt()); + versionInfo = getSchemaVersionInfo(client, schemaVersionKey); + } else { + versionInfo = getLatestSchemaVersionInfo(client, schemaName.get(), schemaBranchName.orElse(null)); + } + + if (versionInfo == null || versionInfo.getVersion() == null) { + final String message = createErrorMessage("Could not find schema", schemaName, schemaBranchName, schemaVersion); + throw new org.apache.nifi.schema.access.SchemaNotFoundException(message); } + } catch (final Exception e) { - handleException("Failed to retrieve schema with name '" + schemaName + "'", e); + final String message = createErrorMessage("Failed to retrieve schema", schemaName, schemaBranchName, schemaVersion); + handleException(message, e); return null; } final String schemaText = versionInfo.getSchemaText(); - final SchemaIdentifier schemaIdentifier = (schemaId == null || version == null) ? SchemaIdentifier.ofName(schemaName) : SchemaIdentifier.of(schemaName, schemaId, version); - final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText); + final SchemaIdentifier resultSchemaIdentifier = SchemaIdentifier.builder() + .id(schemaId) + .name(schemaName.get()) + .branch(schemaBranchName.orElse(null)) + .version(versionInfo.getVersion()) + .build(); + + final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText); return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { final Schema schema = new Schema.Parser().parse(schemaText); - return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier); + return AvroTypeUtil.createSchema(schema, schemaText, resultSchemaIdentifier); }); } - - @Override - public String retrieveSchemaText(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException { + private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier schemaIdentifier) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException { final SchemaRegistryClient client = getClient(); - try { - final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); - if (info == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } - - final SchemaMetadata metadata = info.getSchemaMetadata(); - final String schemaName = metadata.getName(); - - final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); - final SchemaVersionInfo versionInfo = getSchemaVersionInfo(client, schemaVersionKey); - if (versionInfo == null) { - throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); - } + final String schemaName; + final SchemaVersionInfo versionInfo; - return versionInfo.getSchemaText(); - } catch (final Exception e) { - handleException("Failed to retrieve schema with ID '" + schemaId + "' and version '" + version + "'", e); - return null; + final OptionalLong schemaId = schemaIdentifier.getIdentifier(); + if (!schemaId.isPresent()) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present"); } - } - @Override - public RecordSchema retrieveSchema(final long schemaId, final int version) throws org.apache.nifi.schema.access.SchemaNotFoundException, IOException { - final SchemaRegistryClient client = getClient(); + final OptionalInt version = schemaIdentifier.getVersion(); + if (!version.isPresent()) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Version is not present"); + } - final String schemaName; - final SchemaVersionInfo versionInfo; try { - final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId); + final SchemaMetadataInfo info = client.getSchemaMetadataInfo(schemaId.getAsLong()); if (info == null) { throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); } @@ -305,7 +315,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme final SchemaMetadata metadata = info.getSchemaMetadata(); schemaName = metadata.getName(); - final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version); + final SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaName, version.getAsInt()); versionInfo = getSchemaVersionInfo(client, schemaVersionKey); if (versionInfo == null) { throw new org.apache.nifi.schema.access.SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); @@ -317,14 +327,45 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme final String schemaText = versionInfo.getSchemaText(); - final SchemaIdentifier schemaIdentifier = SchemaIdentifier.of(schemaName, schemaId, version); - final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(schemaIdentifier, schemaText); + final SchemaIdentifier resultSchemaIdentifier = SchemaIdentifier.builder() + .name(schemaName) + .id(schemaId.getAsLong()) + .version(version.getAsInt()) + .build(); + + final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText); return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { final Schema schema = new Schema.Parser().parse(schemaText); - return AvroTypeUtil.createSchema(schema, schemaText, schemaIdentifier); + return AvroTypeUtil.createSchema(schema, schemaText, resultSchemaIdentifier); }); } + @Override + public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { + if (schemaIdentifier.getIdentifier().isPresent()) { + return retrieveSchemaByIdAndVersion(schemaIdentifier); + } else { + return retrieveSchemaByName(schemaIdentifier); + } + } + + private String createErrorMessage(final String baseMessage, final Optional<String> schemaName, final Optional<String> branchName, final OptionalInt version) { + final StringBuilder builder = new StringBuilder(baseMessage) + .append(" with name '") + .append(schemaName.orElse("null")) + .append("'"); + + if (branchName.isPresent()) { + builder.append(" and branch '").append(branchName.get()).append("'"); + } + + if (version.isPresent()) { + builder.append(" and version '").append(version.getAsInt()).append("'"); + } + + return builder.toString(); + } + // The schema registry client wraps all IOExceptions in RuntimeException. So if an IOException occurs, we don't know // that it was an IO problem. So we will look through the Exception's cause chain to see if there is an IOException present. private void handleException(final String message, final Exception e) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java index d34c9e8..7380bb2 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/test/java/org/apache/nifi/schemaregistry/hortonworks/TestHortonworksSchemaRegistry.java @@ -17,20 +17,16 @@ package org.apache.nifi.schemaregistry.hortonworks; -import static org.junit.Assert.assertNotNull; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; - -import java.lang.reflect.Constructor; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.HashMap; -import java.util.Map; - +import com.hortonworks.registries.schemaregistry.SchemaCompatibility; +import com.hortonworks.registries.schemaregistry.SchemaMetadata; +import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; +import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; +import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient; +import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.controller.ConfigurationContext; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.util.MockConfigurationContext; import org.junit.Before; import org.junit.Ignore; @@ -39,12 +35,16 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.hortonworks.registries.schemaregistry.SchemaCompatibility; -import com.hortonworks.registries.schemaregistry.SchemaMetadata; -import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; -import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; -import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient; -import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException; +import java.lang.reflect.Constructor; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; public class TestHortonworksSchemaRegistry { private HortonworksSchemaRegistry registry; @@ -124,7 +124,8 @@ public class TestHortonworksSchemaRegistry { registry.enable(configurationContext); for (int i = 0; i < 10000; i++) { - final RecordSchema schema = registry.retrieveSchema("unit-test"); + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("unit-test").build(); + final RecordSchema schema = registry.retrieveSchema(schemaIdentifier); assertNotNull(schema); } @@ -161,7 +162,8 @@ public class TestHortonworksSchemaRegistry { registry.enable(configurationContext); for (int i = 0; i < 2; i++) { - final RecordSchema schema = registry.retrieveSchema("unit-test"); + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("unit-test").build(); + final RecordSchema schema = registry.retrieveSchema(schemaIdentifier); assertNotNull(schema); } @@ -170,7 +172,8 @@ public class TestHortonworksSchemaRegistry { Thread.sleep(2000L); for (int i = 0; i < 2; i++) { - final RecordSchema schema = registry.retrieveSchema("unit-test"); + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("unit-test").build(); + final RecordSchema schema = registry.retrieveSchema(schemaIdentifier); assertNotNull(schema); } http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java index 502d548..b299191 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/serialization/SchemaRegistryService.java @@ -47,6 +47,8 @@ import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_CONTENT_ENCODE import static org.apache.nifi.schema.access.SchemaAccessUtils.HWX_SCHEMA_REF_ATTRIBUTES; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_BRANCH_NAME; +import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_VERSION; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_NAME_PROPERTY; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_REGISTRY; import static org.apache.nifi.schema.access.SchemaAccessUtils.SCHEMA_TEXT; @@ -79,6 +81,8 @@ public abstract class SchemaRegistryService extends AbstractControllerService { properties.add(SCHEMA_REGISTRY); properties.add(SCHEMA_NAME); + properties.add(SCHEMA_VERSION); + properties.add(SCHEMA_BRANCH_NAME); properties.add(SCHEMA_TEXT); return properties; http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java index 88362b8..e785a70 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-schema-registry-service-api/src/main/java/org/apache/nifi/schemaregistry/services/SchemaRegistry.java @@ -16,23 +16,24 @@ */ package org.apache.nifi.schemaregistry.services; -import java.io.IOException; -import java.util.Set; - import org.apache.nifi.controller.ControllerService; import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.serialization.record.RecordSchema; import org.apache.nifi.serialization.record.SchemaIdentifier; +import java.io.IOException; +import java.util.Set; + /** * Represents {@link ControllerService} strategy to expose internal and/or * integrate with external Schema Registry */ public interface SchemaRegistry extends ControllerService { - /** + * @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead + * * Retrieves and returns the textual representation of the schema based on * the provided name of the schema available in Schema Registry. * @@ -41,9 +42,17 @@ public interface SchemaRegistry extends ControllerService { * @throws IOException if unable to communicate with the backing store * @throws SchemaNotFoundException if unable to find the schema with the given name */ - String retrieveSchemaText(String schemaName) throws IOException, SchemaNotFoundException; + default String retrieveSchemaText(String schemaName) throws IOException, SchemaNotFoundException { + final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().name(schemaName).build()); + if (recordSchema == null) { + throw new SchemaNotFoundException("Could not find schema with name '" + schemaName + "'"); + } + return recordSchema.getSchemaText().get(); + } /** + * @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead + * * Retrieves the textual representation of the schema with the given ID and version * * @param schemaId the unique identifier for the desired schema @@ -53,25 +62,37 @@ public interface SchemaRegistry extends ControllerService { * @throws IOException if unable to communicate with the backing store * @throws SchemaNotFoundException if unable to find the schema with the given id and version */ - String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException; + default String retrieveSchemaText(long schemaId, int version) throws IOException, SchemaNotFoundException { + final RecordSchema recordSchema = retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build()); + if (recordSchema == null) { + throw new SchemaNotFoundException("Could not find schema with ID '" + schemaId + "' and version '" + version + "'"); + } + return recordSchema.getSchemaText().get(); + } /** + * @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead + * * Retrieves and returns the RecordSchema based on the provided name of the schema available in Schema Registry. The RecordSchema * that is returned must have the Schema's name populated in its SchemaIdentifier. I.e., a call to * {@link RecordSchema}.{@link RecordSchema#getIdentifier() getIdentifier()}.{@link SchemaIdentifier#getName() getName()} - * will always return an {@link Optional} that is not empty. + * will always return an {@link java.util.Optional} that is not empty. * * @return the latest version of the schema with the given name, or <code>null</code> if no schema can be found with the given name. * @throws SchemaNotFoundException if unable to find the schema with the given name */ - RecordSchema retrieveSchema(String schemaName) throws IOException, SchemaNotFoundException; + default RecordSchema retrieveSchema(String schemaName) throws IOException, SchemaNotFoundException { + return retrieveSchema(SchemaIdentifier.builder().name(schemaName).build()); + } /** + * @deprecated Use {@link #retrieveSchema(SchemaIdentifier)} instead + * * Retrieves the schema with the given ID and version. The RecordSchema that is returned must have the Schema's identifier and version * populated in its SchemaIdentifier. I.e., a call to * {@link RecordSchema}.{@link RecordSchema#getIdentifier() getIdentifier()}.{@link SchemaIdentifier#getIdentifier() getIdentifier()} - * will always return an {@link Optional} that is not empty, as will a call to + * will always return an {@link java.util.Optional} that is not empty, as will a call to * {@link RecordSchema}.{@link RecordSchema#getIdentifier() getIdentifier()}.{@link SchemaIdentifier#getVersion() getVersion()}. * * @param schemaId the unique identifier for the desired schema @@ -82,10 +103,23 @@ public interface SchemaRegistry extends ControllerService { * @throws IOException if unable to communicate with the backing store * @throws SchemaNotFoundException if unable to find the schema with the given id and version */ - RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException; + default RecordSchema retrieveSchema(long schemaId, int version) throws IOException, SchemaNotFoundException { + return retrieveSchema(SchemaIdentifier.builder().id(schemaId).version(version).build()); + } + + /** + * Retrieves the schema based on the provided descriptor. The descriptor must contain and schemaIdentifier or name, but not both, along + * with a version, and an optional branch name. For implementations that do not support branching, the branch name will be ignored. + * + * @param schemaIdentifier a schema schemaIdentifier + * @return the schema for the given descriptor + * @throws IOException if unable to communicate with the backing store + * @throws SchemaNotFoundException if unable to find the schema based on the given descriptor + */ + RecordSchema retrieveSchema(SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException; /** - * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #retrieveSchema(String)} and {@link #retrieveSchema(long, int)} + * @return the set of all Schema Fields that are supplied by the RecordSchema that is returned from {@link #retrieveSchema(SchemaIdentifier)} */ Set<SchemaField> getSuppliedSchemaFields(); }
