NIFI-4935 Refactoring to support specifying schema branch or schema version when using schema by name strategy
This closes #2523. Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/de71a41b Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/de71a41b Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/de71a41b Branch: refs/heads/master Commit: de71a41bd0d2d8fd90884d8a246ef4c537d48196 Parents: 930417b Author: Bryan Bende <[email protected]> Authored: Mon Mar 5 17:02:20 2018 -0500 Committer: Mark Payne <[email protected]> Committed: Thu Mar 15 16:16:12 2018 -0400 ---------------------------------------------------------------------- .../apache/nifi/schema/access/SchemaField.java | 3 +- .../schema/access/SchemaNotFoundException.java | 2 +- .../serialization/record/SchemaIdentifier.java | 28 ++- .../record/StandardSchemaIdentifier.java | 60 +++++- .../schemaregistry/ConfluentSchemaRegistry.java | 63 ++++--- .../client/RestSchemaRegistryClient.java | 2 +- .../nifi/schema/access/SchemaAccessUtils.java | 55 +++++- .../record/MockSchemaRegistry.java | 59 +++--- .../nifi-standard-record-utils/pom.xml | 6 + .../access/ConfluentSchemaRegistryStrategy.java | 17 +- .../access/ConfluentSchemaRegistryWriter.java | 10 +- ...onworksAttributeSchemaReferenceStrategy.java | 5 +- ...rtonworksAttributeSchemaReferenceWriter.java | 11 +- ...rtonworksEncodedSchemaReferenceStrategy.java | 4 +- ...HortonworksEncodedSchemaReferenceWriter.java | 10 +- .../schema/access/SchemaNameAsAttribute.java | 28 ++- .../access/SchemaNamePropertyStrategy.java | 34 +++- .../AbstractSchemaAccessStrategyTest.java | 51 +++++ .../schema/access/SchemaIdentifierMatcher.java | 42 +++++ .../TestConfluentSchemaRegistryStrategy.java | 81 ++++++++ .../TestConfluentSchemaRegistryWriter.java | 76 ++++++++ ...onworksAttributeSchemaReferenceStrategy.java | 64 +++++++ ...rtonworksAttributeSchemaReferenceWriter.java | 99 ++++++++++ ...rtonworksEncodedSchemaReferenceStrategy.java | 87 +++++++++ ...HortonworksEncodedSchemaReferenceWriter.java | 12 +- .../access/TestSchemaNameAsAttribute.java | 90 +++++++++ .../access/TestSchemaNamePropertyStrategy.java | 148 +++++++++++++++ .../services/AvroSchemaRegistry.java | 75 +++----- .../services/TestAvroSchemaRegistry.java | 23 ++- .../processors/standard/ValidateRecord.java | 30 +-- .../nifi-hwx-schema-registry-service/pom.xml | 2 +- .../hortonworks/HortonworksSchemaRegistry.java | 189 +++++++++++-------- .../TestHortonworksSchemaRegistry.java | 43 +++-- .../serialization/SchemaRegistryService.java | 4 + .../schemaregistry/services/SchemaRegistry.java | 56 ++++-- 35 files changed, 1276 insertions(+), 293 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java index 2fe06f4..1844eea 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java @@ -22,7 +22,8 @@ public enum SchemaField { SCHEMA_TEXT_FORMAT("Schema Text Format"), SCHEMA_NAME("Schema Name"), SCHEMA_IDENTIFIER("Schema Identifier"), - SCHEMA_VERSION("Schema Version"); + SCHEMA_VERSION("Schema Version"), + SCHEMA_BRANCH_NAME("Schema Branch Name"); private final String description; http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java index 9a064ff..377a176 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaNotFoundException.java @@ -23,7 +23,7 @@ public class SchemaNotFoundException extends Exception { } public SchemaNotFoundException(final String message, final Throwable cause) { - super(cause); + super(message, cause); } public SchemaNotFoundException(final Throwable cause) { http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java index d7f5664..bca408a 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java @@ -38,14 +38,32 @@ public interface SchemaIdentifier { */ OptionalInt getVersion(); + /** + * @return the name of the branch where the schema is located, if one has been defined + */ + Optional<String> getBranch(); + - public static SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null); + SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null); - public static SchemaIdentifier ofName(final String name) { - return new StandardSchemaIdentifier(name, null, null); + static Builder builder() { + return new StandardSchemaIdentifier.Builder(); } - public static SchemaIdentifier of(final String name, final long identifier, final int version) { - return new StandardSchemaIdentifier(name, identifier, version); + /** + * Implementations should provide a builder to create instances of the SchemaIdentifier. + */ + interface Builder { + + Builder name(String name); + + Builder id(Long id); + + Builder version(Integer version); + + Builder branch(String branch); + + SchemaIdentifier build(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java index 86db284..712486b 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java @@ -25,11 +25,17 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { private final Optional<String> name; private final OptionalLong identifier; private final OptionalInt version; + private final Optional<String> branch; - StandardSchemaIdentifier(final String name, final Long identifier, final Integer version) { + StandardSchemaIdentifier(final String name, final Long identifier, final Integer version, final String branch) { this.name = Optional.ofNullable(name); this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);; this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);; + this.branch = Optional.ofNullable(branch); + + if (this.name == null && this.identifier == null) { + throw new IllegalStateException("Name or Identifier must be provided"); + } } @Override @@ -48,8 +54,13 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { } @Override + public Optional<String> getBranch() { + return branch; + } + + @Override public int hashCode() { - return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode(); + return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode() + 41 * getBranch().hashCode(); } @Override @@ -64,6 +75,49 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { return false; } final SchemaIdentifier other = (SchemaIdentifier) obj; - return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion()); + return getName().equals(other.getName()) + && getIdentifier().equals(other.getIdentifier()) + && getVersion().equals(other.getVersion()) + && getBranch().equals(other.getBranch()); + } + + /** + * Builder to create instances of SchemaIdentifier. + */ + public static class Builder implements SchemaIdentifier.Builder { + + private String name; + private String branch; + private Long identifier; + private Integer version; + + @Override + public SchemaIdentifier.Builder name(final String name) { + this.name = name; + return this; + } + + @Override + public SchemaIdentifier.Builder id(final Long id) { + this.identifier = id; + return this; + } + + @Override + public SchemaIdentifier.Builder version(final Integer version) { + this.version = version; + return this; + } + + @Override + public SchemaIdentifier.Builder branch(final String branch) { + this.branch = branch; + return this; + } + + @Override + public SchemaIdentifier build() { + return new StandardSchemaIdentifier(name, identifier, version, branch); + } } } http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java index 113e096..3558286 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java @@ -17,19 +17,6 @@ package org.apache.nifi.confluent.schemaregistry; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.EnumSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -import javax.net.ssl.SSLContext; - import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnEnabled; @@ -47,9 +34,24 @@ import org.apache.nifi.schema.access.SchemaField; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.ssl.SSLContextService; import org.apache.nifi.ssl.SSLContextService.ClientAuth; +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + @Tags({"schema", "registry", "confluent", "avro", "kafka"}) @@ -176,28 +178,33 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement return baseUrls; } - @Override - public String retrieveSchemaText(final String schemaName) throws IOException, SchemaNotFoundException { - final RecordSchema schema = retrieveSchema(schemaName); - return schema.getSchemaText().get(); - } + private RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + final Optional<String> schemaName = schemaIdentifier.getName(); + if (!schemaName.isPresent()) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present"); + } - @Override - public String retrieveSchemaText(final long schemaId, final int version) throws IOException, SchemaNotFoundException { - final RecordSchema schema = retrieveSchema(schemaId, version); - return schema.getSchemaText().get(); + final RecordSchema schema = client.getSchema(schemaName.get()); + return schema; } - @Override - public RecordSchema retrieveSchema(final String schemaName) throws IOException, SchemaNotFoundException { - final RecordSchema schema = client.getSchema(schemaName); + private RecordSchema retrieveSchemaById(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + final OptionalLong schemaId = schemaIdentifier.getIdentifier(); + if (!schemaId.isPresent()) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present"); + } + + final RecordSchema schema = client.getSchema((int) schemaId.getAsLong()); return schema; } @Override - public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, SchemaNotFoundException { - final RecordSchema schema = client.getSchema((int) schemaId); - return schema; + public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + if (schemaIdentifier.getName().isPresent()) { + return retrieveSchemaByName(schemaIdentifier); + } else { + return retrieveSchemaById(schemaIdentifier); + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java index b2ad19b..14e3e83 100644 --- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java +++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java @@ -145,7 +145,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient { try { final Schema avroSchema = new Schema.Parser().parse(schemaText); - final SchemaIdentifier schemaId = SchemaIdentifier.of(subject, id, version); + final SchemaIdentifier schemaId = SchemaIdentifier.builder().name(subject).id(Long.valueOf(id)).version(version).build(); final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId); return recordSchema; http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java index 4f0f945..17aef2a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaAccessUtils.java @@ -16,19 +16,20 @@ */ package org.apache.nifi.schema.access; -import java.util.Collection; -import java.util.Collections; -import java.util.List; - import org.apache.nifi.avro.AvroSchemaValidator; import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.context.PropertyContext; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + public class SchemaAccessUtils { public static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name", "Use 'Schema Name' Property", @@ -77,6 +78,26 @@ public class SchemaAccessUtils { .required(false) .build(); + public static final PropertyDescriptor SCHEMA_BRANCH_NAME = new PropertyDescriptor.Builder() + .name("schema-branch") + .displayName("Schema Branch") + .description("Specifies the name of the branch to use when looking up the schema in the Schema Registry property. " + + "If the chosen Schema Registry does not support branching, this value will be ignored.") + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + + public static final PropertyDescriptor SCHEMA_VERSION = new PropertyDescriptor.Builder() + .name("schema-version") + .displayName("Schema Version") + .description("Specifies the version of the schema to lookup in the Schema Registry. " + + "If not specified then the latest version of the schema will be retrieved.") + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .expressionLanguageSupported(true) + .required(false) + .build(); + public static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder() .name("schema-text") .displayName("Schema Text") @@ -89,12 +110,15 @@ public class SchemaAccessUtils { public static Collection<ValidationResult> validateSchemaAccessStrategy(final ValidationContext validationContext, final String schemaAccessStrategyValue, final List<AllowableValue> schemaAccessStrategyValues) { + + final Collection<ValidationResult> validationResults = new ArrayList<>(); + if (isSchemaRegistryRequired(schemaAccessStrategyValue)) { final boolean registrySet = validationContext.getProperty(SCHEMA_REGISTRY).isSet(); if (!registrySet) { final String schemaAccessStrategyName = getSchemaAccessStrategyName(schemaAccessStrategyValue, schemaAccessStrategyValues); - return Collections.singleton(new ValidationResult.Builder() + validationResults.add(new ValidationResult.Builder() .subject("Schema Registry") .explanation("The '" + schemaAccessStrategyName + "' Schema Access Strategy requires that the Schema Registry property be set.") .valid(false) @@ -102,7 +126,21 @@ public class SchemaAccessUtils { } } - return Collections.emptyList(); + // ensure that only branch or version is specified, but not both + if (SCHEMA_NAME_PROPERTY.getValue().equalsIgnoreCase(schemaAccessStrategyValue)) { + final boolean branchNameSet = validationContext.getProperty(SCHEMA_BRANCH_NAME).isSet(); + final boolean versionSet = validationContext.getProperty(SCHEMA_VERSION).isSet(); + + if (branchNameSet && versionSet) { + validationResults.add(new ValidationResult.Builder() + .subject(SCHEMA_BRANCH_NAME.getDisplayName()) + .explanation(SCHEMA_BRANCH_NAME.getDisplayName() + " and " + SCHEMA_VERSION.getDisplayName() + " cannot be specified together") + .valid(false) + .build()); + } + } + + return validationResults; } private static String getSchemaAccessStrategyName(final String schemaAccessValue, final List<AllowableValue> schemaAccessStrategyValues) { @@ -123,7 +161,10 @@ public class SchemaAccessUtils { public static SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final PropertyContext context) { if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_PROPERTY.getValue())) { - return new SchemaNamePropertyStrategy(schemaRegistry, context.getProperty(SCHEMA_NAME)); + final PropertyValue schemaName = context.getProperty(SCHEMA_NAME); + final PropertyValue schemaBranchName = context.getProperty(SCHEMA_BRANCH_NAME); + final PropertyValue schemaVersion = context.getProperty(SCHEMA_VERSION); + return new SchemaNamePropertyStrategy(schemaRegistry, schemaName, schemaBranchName, schemaVersion); } else if (allowableValue.equalsIgnoreCase(INHERIT_RECORD_SCHEMA.getValue())) { return new InheritSchemaFromRecord(); } else if (allowableValue.equalsIgnoreCase(SCHEMA_TEXT_PROPERTY.getValue())) { http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java index 36bbe58..a5ec246 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-mock-record-utils/src/main/java/org/apache/nifi/serialization/record/MockSchemaRegistry.java @@ -17,19 +17,21 @@ package org.apache.nifi.serialization.record; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.schema.access.SchemaField; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.util.Tuple; + import java.io.IOException; import java.util.EnumSet; import java.util.Optional; +import java.util.OptionalInt; +import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import org.apache.nifi.controller.AbstractControllerService; -import org.apache.nifi.schema.access.SchemaField; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.util.Tuple; - public class MockSchemaRegistry extends AbstractControllerService implements SchemaRegistry { private final ConcurrentMap<String, RecordSchema> schemaNameMap = new ConcurrentHashMap<>(); private final ConcurrentMap<Tuple<Long, Integer>, RecordSchema> schemaIdVersionMap = new ConcurrentHashMap<>(); @@ -38,39 +40,38 @@ public class MockSchemaRegistry extends AbstractControllerService implements Sch schemaNameMap.put(name, schema); } - @Override - public String retrieveSchemaText(final String schemaName) throws IOException, SchemaNotFoundException { - final RecordSchema schema = schemaNameMap.get(schemaName); - if (schema == null) { - return null; + private RecordSchema retrieveSchemaByName(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + final Optional<String> schemaName = schemaIdentifier.getName(); + if (!schemaName.isPresent()) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Name is not present"); } - final Optional<String> text = schema.getSchemaText(); - return text.orElse(null); + return schemaNameMap.get(schemaName); } - @Override - public String retrieveSchemaText(final long schemaId, final int version) throws IOException, SchemaNotFoundException { - final Tuple<Long, Integer> tuple = new Tuple<>(schemaId, version); - final RecordSchema schema = schemaIdVersionMap.get(tuple); - if (schema == null) { - return null; + private RecordSchema retrieveSchemaByIdAndVersion(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + final OptionalLong schemaId = schemaIdentifier.getIdentifier(); + if (!schemaId.isPresent()) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Id is not present"); } - final Optional<String> text = schema.getSchemaText(); - return text.orElse(null); - } + final OptionalInt version = schemaIdentifier.getVersion(); + if (!version.isPresent()) { + throw new org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema because Schema Version is not present"); + } - @Override - public RecordSchema retrieveSchema(final String schemaName) throws IOException, SchemaNotFoundException { - return schemaNameMap.get(schemaName); + final Tuple<Long, Integer> tuple = new Tuple<>(schemaId.getAsLong(), version.getAsInt()); + final RecordSchema schema = schemaIdVersionMap.get(tuple); + return schema; } @Override - public RecordSchema retrieveSchema(final long schemaId, final int version) throws IOException, SchemaNotFoundException { - final Tuple<Long, Integer> tuple = new Tuple<>(schemaId, version); - final RecordSchema schema = schemaIdVersionMap.get(tuple); - return schema; + public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, SchemaNotFoundException { + if (schemaIdentifier.getName().isPresent()) { + return retrieveSchemaByName(schemaIdentifier); + } else { + return retrieveSchemaByIdAndVersion(schemaIdentifier); + } } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml index 7b68335..9bbf7a8 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/pom.xml @@ -56,5 +56,11 @@ <artifactId>commons-csv</artifactId> <version>1.4</version> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>1.6.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> </dependencies> </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java index b71e811..2734103 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryStrategy.java @@ -17,6 +17,11 @@ package org.apache.nifi.schema.access; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.stream.io.StreamUtils; + import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -25,10 +30,6 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.stream.io.StreamUtils; - public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy { private final Set<SchemaField> schemaFields; private final SchemaRegistry schemaRegistry; @@ -64,7 +65,13 @@ public class ConfluentSchemaRegistryStrategy implements SchemaAccessStrategy { } final int schemaId = bb.getInt(); - return schemaRegistry.retrieveSchema(schemaId, 1); + + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder() + .id(Long.valueOf(schemaId)) + .version(1) + .build(); + + return schemaRegistry.retrieveSchema(schemaIdentifier); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java index 3677b9f..584f32e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/ConfluentSchemaRegistryWriter.java @@ -17,6 +17,9 @@ package org.apache.nifi.schema.access; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -27,16 +30,13 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - public class ConfluentSchemaRegistryWriter implements SchemaAccessWriter { private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); @Override public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { final SchemaIdentifier identifier = schema.getIdentifier(); - final long id = identifier.getIdentifier().getAsLong(); + final Long id = identifier.getIdentifier().getAsLong(); // This encoding follows the pattern that is provided for serializing data by the Confluent Schema Registry serializer // as it is provided at: @@ -45,7 +45,7 @@ public class ConfluentSchemaRegistryWriter implements SchemaAccessWriter { // representing the schema id. final ByteBuffer bb = ByteBuffer.allocate(5); bb.put((byte) 0); - bb.putInt((int) id); + bb.putInt(id.intValue()); out.write(bb.array()); } http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java index 19606c0..f4fdfcb 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java @@ -19,6 +19,7 @@ package org.apache.nifi.schema.access; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import java.io.IOException; import java.io.InputStream; @@ -34,6 +35,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version"; public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version"; + private final SchemaRegistry schemaRegistry; @@ -84,7 +86,8 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess final long schemaId = Long.parseLong(schemaIdentifier); final int version = Integer.parseInt(schemaVersion); - final RecordSchema schema = schemaRegistry.retrieveSchema(schemaId, version); + final SchemaIdentifier identifier = SchemaIdentifier.builder().id(schemaId).version(version).build(); + final RecordSchema schema = schemaRegistry.retrieveSchema(identifier); if (schema == null) { throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java index dd7d676..bea2f96 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java @@ -29,7 +29,8 @@ import java.util.Set; public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter { private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); - private static final int LATEST_PROTOCOL_VERSION = 1; + static final int LATEST_PROTOCOL_VERSION = 1; + static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch"; @Override public void writeHeader(RecordSchema schema, OutputStream out) throws IOException { @@ -40,13 +41,17 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr final Map<String, String> attributes = new HashMap<>(4); final SchemaIdentifier id = schema.getIdentifier(); - final long schemaId = id.getIdentifier().getAsLong(); - final int schemaVersion = id.getVersion().getAsInt(); + final Long schemaId = id.getIdentifier().getAsLong(); + final Integer schemaVersion = id.getVersion().getAsInt(); attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion)); attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION)); + if (id.getBranch().isPresent()) { + attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get()); + } + return attributes; } http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java index 74bde54..077016a 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java @@ -19,6 +19,7 @@ package org.apache.nifi.schema.access; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import org.apache.nifi.stream.io.StreamUtils; import java.io.IOException; @@ -66,7 +67,8 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt final long schemaId = bb.getLong(); final int schemaVersion = bb.getInt(); - return schemaRegistry.retrieveSchema(schemaId, schemaVersion); + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build(); + return schemaRegistry.retrieveSchema(schemaIdentifier); } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java index bf6a9ea..cb4ed4e 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java @@ -17,6 +17,9 @@ package org.apache.nifi.schema.access; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -27,9 +30,6 @@ import java.util.OptionalInt; import java.util.OptionalLong; import java.util.Set; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter { private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); private static final int LATEST_PROTOCOL_VERSION = 1; @@ -37,8 +37,8 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit @Override public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { final SchemaIdentifier identifier = schema.getIdentifier(); - final long id = identifier.getIdentifier().getAsLong(); - final int version = identifier.getVersion().getAsInt(); + final Long id = identifier.getIdentifier().getAsLong(); + final Integer version = identifier.getVersion().getAsInt(); // This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer // as it is provided at: http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java index 54a248d..4842df3 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNameAsAttribute.java @@ -19,10 +19,11 @@ package org.apache.nifi.schema.access; import java.io.IOException; import java.io.OutputStream; -import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.OptionalInt; import java.util.Set; import org.apache.nifi.serialization.record.RecordSchema; @@ -30,7 +31,9 @@ import org.apache.nifi.serialization.record.SchemaIdentifier; public class SchemaNameAsAttribute implements SchemaAccessWriter { private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME); - private static final String SCHEMA_NAME_ATTRIBUTE = "schema.name"; + static final String SCHEMA_NAME_ATTRIBUTE = "schema.name"; + static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch"; + static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version"; @Override public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { @@ -38,17 +41,34 @@ public class SchemaNameAsAttribute implements SchemaAccessWriter { @Override public Map<String, String> getAttributes(final RecordSchema schema) { + final Map<String,String> attributes = new HashMap<>(3); + final SchemaIdentifier identifier = schema.getIdentifier(); + final Optional<String> nameOption = identifier.getName(); if (nameOption.isPresent()) { - return Collections.singletonMap(SCHEMA_NAME_ATTRIBUTE, nameOption.get()); + attributes.put(SCHEMA_NAME_ATTRIBUTE, nameOption.get()); + } + + final OptionalInt versionOption = identifier.getVersion(); + if (versionOption.isPresent()) { + attributes.put(SCHEMA_VERSION_ATTRIBUTE, String.valueOf(versionOption.getAsInt())); } - return Collections.emptyMap(); + + final Optional<String> branchOption = identifier.getBranch(); + if (branchOption.isPresent()) { + attributes.put(SCHEMA_BRANCH_ATTRIBUTE, branchOption.get()); + } + + return attributes; } @Override public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { final SchemaIdentifier schemaId = schema.getIdentifier(); + if (schemaId == null) { + throw new SchemaNotFoundException("Cannot write Schema Name As Attribute because Schema Identifier is not known"); + } if (!schemaId.getName().isPresent()) { throw new SchemaNotFoundException("Cannot write Schema Name As Attribute because the Schema Name is not known"); } http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java index 07ba3f3..257fb58 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/SchemaNamePropertyStrategy.java @@ -17,9 +17,11 @@ package org.apache.nifi.schema.access; +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.schemaregistry.services.SchemaRegistry; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; import java.io.InputStream; import java.util.Collections; @@ -32,10 +34,17 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy { private final SchemaRegistry schemaRegistry; private final PropertyValue schemaNamePropertyValue; + private final PropertyValue schemaBranchNamePropertyValue; + private final PropertyValue schemaVersionPropertyValue; - public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry, final PropertyValue schemaNamePropertyValue) { + public SchemaNamePropertyStrategy(final SchemaRegistry schemaRegistry, + final PropertyValue schemaNamePropertyValue, + final PropertyValue schemaBranchNamePropertyValue, + final PropertyValue schemaVersionPropertyValue) { this.schemaRegistry = schemaRegistry; this.schemaNamePropertyValue = schemaNamePropertyValue; + this.schemaBranchNamePropertyValue = schemaBranchNamePropertyValue; + this.schemaVersionPropertyValue = schemaVersionPropertyValue; schemaFields = new HashSet<>(); schemaFields.add(SchemaField.SCHEMA_NAME); @@ -50,12 +59,33 @@ public class SchemaNamePropertyStrategy implements SchemaAccessStrategy { } try { - final RecordSchema recordSchema = schemaRegistry.retrieveSchema(schemaName); + final String schemaBranchName = schemaBranchNamePropertyValue.evaluateAttributeExpressions(variables).getValue(); + final String schemaVersion = schemaVersionPropertyValue.evaluateAttributeExpressions(variables).getValue(); + + final SchemaIdentifier.Builder identifierBuilder = SchemaIdentifier.builder(); + identifierBuilder.name(schemaName); + + if (!StringUtils.isBlank(schemaBranchName)) { + identifierBuilder.branch(schemaBranchName); + } + + if (!StringUtils.isBlank(schemaVersion)) { + try { + identifierBuilder.version(Integer.valueOf(schemaVersion)); + } catch (NumberFormatException nfe) { + throw new SchemaNotFoundException("Could not retrieve schema with name '" + schemaName + + "' because a non-numeric version was supplied '" + schemaVersion + "'", nfe); + } + } + + final RecordSchema recordSchema = schemaRegistry.retrieveSchema(identifierBuilder.build()); if (recordSchema == null) { throw new SchemaNotFoundException("Could not find a schema with name '" + schemaName + "' in the configured Schema Registry"); } return recordSchema; + } catch (final SchemaNotFoundException snf) { + throw snf; } catch (final Exception e) { throw new SchemaNotFoundException("Could not retrieve schema with name '" + schemaName + "' from the configured Schema Registry", e); } http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java new file mode 100644 index 0000000..f613ab8 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/AbstractSchemaAccessStrategyTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.junit.Before; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.List; + +public class AbstractSchemaAccessStrategyTest { + + protected SchemaRegistry schemaRegistry; + protected RecordSchema recordSchema; + + @Before + public void setup() { + this.schemaRegistry = Mockito.mock(SchemaRegistry.class); + + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder() + .name("person").branch("master").version(1).id(1L).build(); + + this.recordSchema = new SimpleRecordSchema(fields, schemaIdentifier); + } + + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/SchemaIdentifierMatcher.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/SchemaIdentifierMatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/SchemaIdentifierMatcher.java new file mode 100644 index 0000000..a99f3f3 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/SchemaIdentifierMatcher.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.mockito.ArgumentMatcher; + +/** + * ArgumentMatcher for SchemaIdentifier. + */ +public class SchemaIdentifierMatcher extends ArgumentMatcher<SchemaIdentifier> { + + private final SchemaIdentifier expectedIdentifier; + + public SchemaIdentifierMatcher(final SchemaIdentifier expectedIdentifier) { + this.expectedIdentifier = expectedIdentifier; + } + + @Override + public boolean matches(final Object argument) { + if (argument == null || !(argument instanceof SchemaIdentifier)) { + return false; + } + + final SchemaIdentifier other = (SchemaIdentifier) argument; + return other.equals(expectedIdentifier); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryStrategy.java new file mode 100644 index 0000000..6bd585e --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryStrategy.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.when; + +public class TestConfluentSchemaRegistryStrategy extends AbstractSchemaAccessStrategyTest { + + @Test + public void testGetSchemaWithValidEncoding() throws IOException, SchemaNotFoundException { + final SchemaAccessStrategy schemaAccessStrategy = new ConfluentSchemaRegistryStrategy(schemaRegistry); + + final int schemaId = 123456; + + try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(bytesOut)) { + out.write(0); + out.writeInt(schemaId); + out.flush(); + + try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) { + + // the confluent strategy will read the id from the input stream and use '1' as the version + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .id((long)schemaId) + .version(1) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema); + assertNotNull(retrievedSchema); + } + } + } + + @Test(expected = SchemaNotFoundException.class) + public void testGetSchemaWithInvalidEncoding() throws IOException, SchemaNotFoundException { + final SchemaAccessStrategy schemaAccessStrategy = new ConfluentSchemaRegistryStrategy(schemaRegistry); + + final int schemaId = 123456; + + try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(bytesOut)) { + out.write(1); // write an invalid magic byte + out.writeInt(schemaId); + out.flush(); + + try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) { + schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryWriter.java new file mode 100644 index 0000000..8ee8280 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestConfluentSchemaRegistryWriter.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.junit.Assert; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class TestConfluentSchemaRegistryWriter { + + @Test + public void testValidateValidSchema() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new ConfluentSchemaRegistryWriter(); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateInvalidSchema() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new ConfluentSchemaRegistryWriter(); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test + public void testWriteHeader() throws IOException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + + final SchemaAccessWriter schemaAccessWriter = new ConfluentSchemaRegistryWriter(); + schemaAccessWriter.writeHeader(recordSchema, out); + + try (final ByteArrayInputStream bytesIn = new ByteArrayInputStream(out.toByteArray()); + final DataInputStream in = new DataInputStream(bytesIn)) { + Assert.assertEquals(0, in.readByte()); + Assert.assertEquals((int) schemaIdentifier.getIdentifier().getAsLong(), in.readInt()); + } + } + + private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + return new SimpleRecordSchema(fields, schemaIdentifier); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java new file mode 100644 index 0000000..851058f --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceStrategy.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.when; + +public class TestHortonworksAttributeSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest { + + @Test + public void testGetSchemaWithValidAttributes() throws IOException, SchemaNotFoundException { + final long schemaId = 123456; + final int version = 2; + final int protocol = 1; + + final Map<String,String> attributes = new HashMap<>(); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(version)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(protocol)); + + final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); + + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .id(schemaId) + .version(version) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(attributes, null, recordSchema); + assertNotNull(retrievedSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testGetSchemaMissingAttributes() throws IOException, SchemaNotFoundException { + final SchemaAccessStrategy schemaAccessStrategy = new HortonworksAttributeSchemaReferenceStrategy(schemaRegistry); + schemaAccessStrategy.getSchema(Collections.emptyMap(), null, recordSchema); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java new file mode 100644 index 0000000..ea3be57 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksAttributeSchemaReferenceWriter.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TestHortonworksAttributeSchemaReferenceWriter { + + @Test + public void testValidateWithValidSchema() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateWithInvalidSchema() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("test").build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(); + schemaAccessWriter.validateSchema(recordSchema); + } + + @Test + public void testGetAttributesWithoutBranch() { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(); + final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema); + + Assert.assertEquals(3, attributes.size()); + + Assert.assertEquals(String.valueOf(schemaIdentifier.getIdentifier().getAsLong()), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE)); + + Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE)); + + Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE)); + } + + @Test + public void testGetAttributesWithBranch() { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(123456L).version(2).branch("foo").build(); + final RecordSchema recordSchema = createRecordSchema(schemaIdentifier); + + final SchemaAccessWriter schemaAccessWriter = new HortonworksAttributeSchemaReferenceWriter(); + final Map<String,String> attributes = schemaAccessWriter.getAttributes(recordSchema); + + Assert.assertEquals(4, attributes.size()); + + Assert.assertEquals(String.valueOf(schemaIdentifier.getIdentifier().getAsLong()), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE)); + + Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE)); + + Assert.assertEquals(String.valueOf(HortonworksAttributeSchemaReferenceWriter.LATEST_PROTOCOL_VERSION), + attributes.get(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE)); + + Assert.assertEquals("foo", attributes.get(HortonworksAttributeSchemaReferenceWriter.SCHEMA_BRANCH_ATTRIBUTE)); + } + + private RecordSchema createRecordSchema(final SchemaIdentifier schemaIdentifier) { + final List<RecordField> fields = new ArrayList<>(); + fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + return new SimpleRecordSchema(fields, schemaIdentifier); + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceStrategy.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceStrategy.java new file mode 100644 index 0000000..6352eba --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceStrategy.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Collections; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.when; + +public class TestHortonworksEncodedSchemaReferenceStrategy extends AbstractSchemaAccessStrategyTest { + + @Test + public void testGetSchemaWithValidEncoding() throws IOException, SchemaNotFoundException { + final SchemaAccessStrategy schemaAccessStrategy = new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry); + + final int protocol = 1; + final long schemaId = 123456; + final int version = 2; + + try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(bytesOut)) { + out.write(protocol); + out.writeLong(schemaId); + out.writeInt(version); + out.flush(); + + try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) { + + // the confluent strategy will read the id from the input stream and use '1' as the version + final SchemaIdentifier expectedSchemaIdentifier = SchemaIdentifier.builder() + .id(schemaId) + .version(version) + .build(); + + when(schemaRegistry.retrieveSchema(argThat(new SchemaIdentifierMatcher(expectedSchemaIdentifier)))) + .thenReturn(recordSchema); + + final RecordSchema retrievedSchema = schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema); + assertNotNull(retrievedSchema); + } + } + } + + @Test(expected = SchemaNotFoundException.class) + public void testGetSchemaWithInvalidProtocol() throws IOException, SchemaNotFoundException { + final SchemaAccessStrategy schemaAccessStrategy = new HortonworksEncodedSchemaReferenceStrategy(schemaRegistry); + + final int protocol = 0; // use an invalid protocol + final long schemaId = 123456; + final int version = 2; + + try (final ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(bytesOut)) { + out.write(protocol); + out.writeLong(schemaId); + out.writeInt(version); + out.flush(); + + try (final ByteArrayInputStream in = new ByteArrayInputStream(bytesOut.toByteArray())) { + schemaAccessStrategy.getSchema(Collections.emptyMap(), in, recordSchema); + } + } + } +} http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java index 5d7f4d7..b0589ea 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestHortonworksEncodedSchemaReferenceWriter.java @@ -17,7 +17,10 @@ package org.apache.nifi.schema.access; -import static org.junit.Assert.assertEquals; +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -25,10 +28,7 @@ import java.io.DataInputStream; import java.io.IOException; import java.util.Collections; -import org.apache.nifi.serialization.SimpleRecordSchema; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; -import org.junit.Test; +import static org.junit.Assert.assertEquals; public class TestHortonworksEncodedSchemaReferenceWriter { @@ -36,7 +36,7 @@ public class TestHortonworksEncodedSchemaReferenceWriter { public void testHeader() throws IOException { final HortonworksEncodedSchemaReferenceWriter writer = new HortonworksEncodedSchemaReferenceWriter(); - final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.of("name", 48L, 2)); + final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList(), SchemaIdentifier.builder().name("name").id( 48L).version( 2).build()); final byte[] header; try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { http://git-wip-us.apache.org/repos/asf/nifi/blob/de71a41b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNameAsAttribute.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNameAsAttribute.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNameAsAttribute.java new file mode 100644 index 0000000..be437a5 --- /dev/null +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/test/java/org/apache/nifi/schema/access/TestSchemaNameAsAttribute.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.schema.access; + +import org.apache.nifi.serialization.SimpleRecordSchema; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class TestSchemaNameAsAttribute { + + private List<RecordField> fields; + private SchemaAccessWriter schemaAccessWriter; + + @Before + public void setup() { + fields = new ArrayList<>(); + fields.add(new RecordField("firstName", RecordFieldType.STRING.getDataType())); + fields.add(new RecordField("lastName", RecordFieldType.STRING.getDataType())); + + schemaAccessWriter = new SchemaNameAsAttribute(); + } + + @Test + public void testWriteNameBranchAndVersion() { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder() + .name("person").branch("master").version(1).id(1L).build(); + + final RecordSchema schema = new SimpleRecordSchema(fields, schemaIdentifier); + + final Map<String,String> attributes = schemaAccessWriter.getAttributes(schema); + Assert.assertEquals(3, attributes.size()); + Assert.assertEquals(schemaIdentifier.getName().get(), attributes.get(SchemaNameAsAttribute.SCHEMA_NAME_ATTRIBUTE)); + Assert.assertEquals(schemaIdentifier.getBranch().get(), attributes.get(SchemaNameAsAttribute.SCHEMA_BRANCH_ATTRIBUTE)); + Assert.assertEquals(String.valueOf(schemaIdentifier.getVersion().getAsInt()), attributes.get(SchemaNameAsAttribute.SCHEMA_VERSION_ATTRIBUTE)); + } + + @Test + public void testWriteOnlyName() { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("person").id(1L).build(); + + final RecordSchema schema = new SimpleRecordSchema(fields, schemaIdentifier); + + final Map<String,String> attributes = schemaAccessWriter.getAttributes(schema); + Assert.assertEquals(1, attributes.size()); + Assert.assertEquals(schemaIdentifier.getName().get(), attributes.get(SchemaNameAsAttribute.SCHEMA_NAME_ATTRIBUTE)); + } + + @Test + public void testValidateSchemaWhenValid() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().name("person").id(1L).build(); + final RecordSchema schema = new SimpleRecordSchema(fields, schemaIdentifier); + schemaAccessWriter.validateSchema(schema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateSchemaWhenNoIdentifier() throws SchemaNotFoundException { + final RecordSchema schema = new SimpleRecordSchema(fields, null); + schemaAccessWriter.validateSchema(schema); + } + + @Test(expected = SchemaNotFoundException.class) + public void testValidateSchemaWhenNoName() throws SchemaNotFoundException { + final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(1L).build(); + final RecordSchema schema = new SimpleRecordSchema(fields, schemaIdentifier); + schemaAccessWriter.validateSchema(schema); + } +}
