This is an automated email from the ASF dual-hosted git repository. bbende pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 1fe79021b59fb8763b4af3e911658800bed3d41a Author: Pierre Villard <[email protected]> AuthorDate: Wed Mar 4 19:25:26 2020 +0100 NIFI-7221 Initial work --- .../org/apache/nifi/schema/access/SchemaField.java | 1 + .../serialization/record/SchemaIdentifier.java | 16 ++++- .../record/StandardSchemaIdentifier.java | 53 +++++++++++++-- ...ortonworksAttributeSchemaReferenceStrategy.java | 52 ++++++++------- .../HortonworksAttributeSchemaReferenceWriter.java | 29 +++++---- .../HortonworksEncodedSchemaReferenceStrategy.java | 75 ++++++++++++++++------ .../HortonworksEncodedSchemaReferenceWriter.java | 65 ++++++++++++------- .../hortonworks/HortonworksSchemaRegistry.java | 50 ++++++++++++++- 8 files changed, 257 insertions(+), 84 deletions(-) diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java index 1844eea..f577f05 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/schema/access/SchemaField.java @@ -22,6 +22,7 @@ public enum SchemaField { SCHEMA_TEXT_FORMAT("Schema Text Format"), SCHEMA_NAME("Schema Name"), SCHEMA_IDENTIFIER("Schema Identifier"), + SCHEMA_VERSION_ID("Schema-Version Identifier"), SCHEMA_VERSION("Schema Version"), SCHEMA_BRANCH_NAME("Schema Branch Name"); diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java index bca408a..4fb5371 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/SchemaIdentifier.java @@ -39,12 +39,22 @@ public interface SchemaIdentifier { OptionalInt getVersion(); /** + * @return the schema version ID of the schema, if one has been defined. + */ + OptionalLong getSchemaVersionId(); + + /** * @return the name of the branch where the schema is located, if one has been defined */ Optional<String> getBranch(); + /** + * @return the protocol used to get this schema identifier + */ + Integer getProtocol(); + - SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null); + SchemaIdentifier EMPTY = new StandardSchemaIdentifier(null, null, null, null, null, -1); static Builder builder() { return new StandardSchemaIdentifier.Builder(); @@ -61,8 +71,12 @@ public interface SchemaIdentifier { Builder version(Integer version); + Builder schemaVersionId(Long schemaVersionId); + Builder branch(String branch); + Builder protocol(Integer protocol); + SchemaIdentifier build(); } diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java index 712486b..0800982 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/StandardSchemaIdentifier.java @@ -25,15 +25,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { private final Optional<String> name; private final OptionalLong identifier; private final OptionalInt version; + private final OptionalLong schemaVersionId; private final Optional<String> branch; + private final int protocol; - StandardSchemaIdentifier(final String name, final Long identifier, final Integer version, final String branch) { + StandardSchemaIdentifier(final String name, final Long identifier, final Integer version, + final Long schemaVersionId, final String branch, final int protocol) { this.name = Optional.ofNullable(name); - this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier);; - this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version);; + this.identifier = identifier == null ? OptionalLong.empty() : OptionalLong.of(identifier); + this.version = version == null ? OptionalInt.empty() : OptionalInt.of(version); + this.schemaVersionId = schemaVersionId == null ? OptionalLong.empty() : OptionalLong.of(schemaVersionId); this.branch = Optional.ofNullable(branch); + this.protocol = protocol; - if (this.name == null && this.identifier == null) { + if ((this.name == null && this.identifier == null) || this.schemaVersionId == null) { throw new IllegalStateException("Name or Identifier must be provided"); } } @@ -54,13 +59,24 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { } @Override + public OptionalLong getSchemaVersionId() { + return schemaVersionId; + } + + @Override public Optional<String> getBranch() { return branch; } @Override + public Integer getProtocol() { + return protocol; + } + + @Override public int hashCode() { - return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode() + 41 * getBranch().hashCode(); + return 31 + 41 * getName().hashCode() + 41 * getIdentifier().hashCode() + 41 * getVersion().hashCode() + + 41 * getSchemaVersionId().hashCode() + 41 * getBranch().hashCode(); } @Override @@ -78,9 +94,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { return getName().equals(other.getName()) && getIdentifier().equals(other.getIdentifier()) && getVersion().equals(other.getVersion()) + && getSchemaVersionId().equals(other.getSchemaVersionId()) && getBranch().equals(other.getBranch()); } + @Override + public String toString() { + return "[ name = " + name + ", " + + "identifier = " + identifier + ", " + + "version = " + version + ", " + + "schemaVersionId = " + schemaVersionId + ", " + + "branch = " + branch + ", " + + "protocol = " + protocol + " ]"; + } + /** * Builder to create instances of SchemaIdentifier. */ @@ -90,6 +117,8 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { private String branch; private Long identifier; private Integer version; + private Long schemaVersionId; + private Integer protocol; @Override public SchemaIdentifier.Builder name(final String name) { @@ -116,8 +145,20 @@ public class StandardSchemaIdentifier implements SchemaIdentifier { } @Override + public SchemaIdentifier.Builder schemaVersionId(final Long schemaVersionId) { + this.schemaVersionId = schemaVersionId; + return this; + } + + @Override + public SchemaIdentifier.Builder protocol(final Integer protocol) { + this.protocol = protocol; + return this; + } + + @Override public SchemaIdentifier build() { - return new StandardSchemaIdentifier(name, identifier, version, branch); + return new StandardSchemaIdentifier(name, identifier, version, schemaVersionId, branch, protocol); } } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java index f4fdfcb..b20d683 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceStrategy.java @@ -17,10 +17,6 @@ package org.apache.nifi.schema.access; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - import java.io.IOException; import java.io.InputStream; import java.util.Collections; @@ -28,16 +24,20 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccessStrategy { private final Set<SchemaField> schemaFields; public static final String SCHEMA_ID_ATTRIBUTE = "schema.identifier"; public static final String SCHEMA_VERSION_ATTRIBUTE = "schema.version"; public static final String SCHEMA_PROTOCOL_VERSION_ATTRIBUTE = "schema.protocol.version"; - + public static final String SCHEMA_VERSION_ID_ATTRIBUTE = "schema.version.id"; private final SchemaRegistry schemaRegistry; - + static final int LATEST_PROTOCOL_VERSION = 3; public HortonworksAttributeSchemaReferenceStrategy(final SchemaRegistry schemaRegistry) { this.schemaRegistry = schemaRegistry; @@ -45,6 +45,7 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess schemaFields = new HashSet<>(); schemaFields.add(SchemaField.SCHEMA_IDENTIFIER); schemaFields.add(SchemaField.SCHEMA_VERSION); + schemaFields.add(SchemaField.SCHEMA_VERSION_ID); schemaFields.addAll(schemaRegistry == null ? Collections.emptySet() : schemaRegistry.getSuppliedSchemaFields()); } @@ -57,7 +58,8 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess final String schemaIdentifier = variables.get(SCHEMA_ID_ATTRIBUTE); final String schemaVersion = variables.get(SCHEMA_VERSION_ATTRIBUTE); final String schemaProtocol = variables.get(SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); - if (schemaIdentifier == null || schemaVersion == null || schemaProtocol == null) { + final String schemaVersionId = variables.get(SCHEMA_VERSION_ID_ATTRIBUTE); + if ((schemaVersionId == null && (schemaIdentifier == null || schemaVersion == null)) || schemaProtocol == null) { throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because it is missing one of the following three required attributes: " + SCHEMA_ID_ATTRIBUTE + ", " + SCHEMA_VERSION_ATTRIBUTE + ", " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE); } @@ -68,28 +70,34 @@ public class HortonworksAttributeSchemaReferenceStrategy implements SchemaAccess } final int protocol = Integer.parseInt(schemaProtocol); - if (protocol != 1) { - throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_PROTOCOL_VERSION_ATTRIBUTE + " has a value of '" - + schemaProtocol + "', which is not a valid Protocol Version number. Expected Protocol Version to be 1."); + if (protocol > LATEST_PROTOCOL_VERSION) { + throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version " + + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocol + " or was not encoded with this data format"); } - if (!isNumber(schemaIdentifier)) { - throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '" - + schemaProtocol + "', which is not a valid Schema Identifier number"); - } + SchemaIdentifier identifier; + if (!isNumber(schemaVersionId)) { + if (!isNumber(schemaIdentifier)) { + throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_ID_ATTRIBUTE + " has a value of '" + + schemaProtocol + "', which is not a valid Schema Identifier number"); + } - if (!isNumber(schemaVersion)) { - throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '" - + schemaProtocol + "', which is not a valid Schema Version number"); - } + if (!isNumber(schemaVersion)) { + throw new SchemaNotFoundException("Could not determine Schema for " + variables + " because the " + SCHEMA_VERSION_ATTRIBUTE + " has a value of '" + + schemaProtocol + "', which is not a valid Schema Version number"); + } - final long schemaId = Long.parseLong(schemaIdentifier); - final int version = Integer.parseInt(schemaVersion); + final long schemaId = Long.parseLong(schemaIdentifier); + final int version = Integer.parseInt(schemaVersion); + identifier = SchemaIdentifier.builder().id(schemaId).version(version).protocol(protocol).build(); + } else { + final long svi = Long.parseLong(schemaVersionId); + identifier = SchemaIdentifier.builder().schemaVersionId(svi).protocol(protocol).build(); + } - final SchemaIdentifier identifier = SchemaIdentifier.builder().id(schemaId).version(version).build(); final RecordSchema schema = schemaRegistry.retrieveSchema(identifier); if (schema == null) { - throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + schemaId + "' and Version '" + version + "'"); + throw new SchemaNotFoundException("Could not find a Schema in the Schema Registry with Schema Identifier '" + identifier.toString() + "'"); } return schema; diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java index bea2f96..ad4558f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksAttributeSchemaReferenceWriter.java @@ -17,9 +17,6 @@ package org.apache.nifi.schema.access; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - import java.io.IOException; import java.io.OutputStream; import java.util.EnumSet; @@ -27,9 +24,12 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWriter { private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); - static final int LATEST_PROTOCOL_VERSION = 1; + static final int LATEST_PROTOCOL_VERSION = 3; static final String SCHEMA_BRANCH_ATTRIBUTE = "schema.branch"; @Override @@ -46,23 +46,30 @@ public class HortonworksAttributeSchemaReferenceWriter implements SchemaAccessWr attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_ID_ATTRIBUTE, String.valueOf(schemaId)); attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ATTRIBUTE, String.valueOf(schemaVersion)); - attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(LATEST_PROTOCOL_VERSION)); + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_PROTOCOL_VERSION_ATTRIBUTE, String.valueOf(id.getProtocol())); if (id.getBranch().isPresent()) { attributes.put(SCHEMA_BRANCH_ATTRIBUTE, id.getBranch().get()); } + if (id.getSchemaVersionId().isPresent()) { + attributes.put(HortonworksAttributeSchemaReferenceStrategy.SCHEMA_VERSION_ID_ATTRIBUTE, String.valueOf(id.getSchemaVersionId().getAsLong())); + } + return attributes; } @Override public void validateSchema(final RecordSchema schema) throws SchemaNotFoundException { - final SchemaIdentifier id = schema.getIdentifier(); - if (!id.getIdentifier().isPresent()) { - throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Identifier"); - } - if (!id.getVersion().isPresent()) { - throw new SchemaNotFoundException("Cannot write Schema Reference as Attributes because it does not contain a Schema Version"); + final SchemaIdentifier identifier = schema.getIdentifier(); + + if(!identifier.getSchemaVersionId().isPresent()) { + if (!identifier.getIdentifier().isPresent()) { + throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known"); + } + if (!identifier.getVersion().isPresent()) { + throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known"); + } } } diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java index 077016a..8f3c1b4 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceStrategy.java @@ -17,11 +17,6 @@ package org.apache.nifi.schema.access; -import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; -import org.apache.nifi.stream.io.StreamUtils; - import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -30,8 +25,13 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.nifi.schemaregistry.services.SchemaRegistry; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; +import org.apache.nifi.stream.io.StreamUtils; + public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessStrategy { - private static final int LATEST_PROTOCOL_VERSION = 1; + private static final int LATEST_PROTOCOL_VERSION = 3; private final Set<SchemaField> schemaFields; private final SchemaRegistry schemaRegistry; @@ -47,28 +47,67 @@ public class HortonworksEncodedSchemaReferenceStrategy implements SchemaAccessSt @Override public RecordSchema getSchema(final Map<String, String> variables, final InputStream contentStream, final RecordSchema readSchema) throws SchemaNotFoundException, IOException { - final byte[] buffer = new byte[13]; + final byte[] buffer = new byte[1]; try { StreamUtils.fillBuffer(contentStream, buffer); } catch (final IOException ioe) { - throw new SchemaNotFoundException("Could not read first 13 bytes from stream", ioe); + throw new SchemaNotFoundException("Could not read first byte from stream", ioe); } // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer - // as it is provided at: - // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java + // See: https://registry-project.readthedocs.io/en/latest/serdes.html# final ByteBuffer bb = ByteBuffer.wrap(buffer); final int protocolVersion = bb.get(); - if (protocolVersion != 1) { - throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version " - + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format"); - } + SchemaIdentifier schemaIdentifier; + + switch(protocolVersion) { + case 1: + final byte[] bufferv1 = new byte[12]; + + try { + StreamUtils.fillBuffer(contentStream, bufferv1); + } catch (final IOException ioe) { + throw new SchemaNotFoundException("Could not read bytes from stream", ioe); + } + final ByteBuffer bbv1 = ByteBuffer.wrap(buffer); + + final long schemaId = bbv1.getLong(); + final int schemaVersion = bbv1.getInt(); + schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).protocol(protocolVersion).build(); + return schemaRegistry.retrieveSchema(schemaIdentifier); + + case 2: + final byte[] bufferv2 = new byte[8]; - final long schemaId = bb.getLong(); - final int schemaVersion = bb.getInt(); + try { + StreamUtils.fillBuffer(contentStream, bufferv2); + } catch (final IOException ioe) { + throw new SchemaNotFoundException("Could not read bytes from stream", ioe); + } + final ByteBuffer bbv2 = ByteBuffer.wrap(buffer); - final SchemaIdentifier schemaIdentifier = SchemaIdentifier.builder().id(schemaId).version(schemaVersion).build(); - return schemaRegistry.retrieveSchema(schemaIdentifier); + final long sviLong = bbv2.getLong(); + schemaIdentifier = SchemaIdentifier.builder().schemaVersionId(sviLong).protocol(protocolVersion).build(); + return schemaRegistry.retrieveSchema(schemaIdentifier); + + case 3: + final byte[] bufferv3 = new byte[4]; + + try { + StreamUtils.fillBuffer(contentStream, bufferv3); + } catch (final IOException ioe) { + throw new SchemaNotFoundException("Could not read bytes from stream", ioe); + } + final ByteBuffer bbv3 = ByteBuffer.wrap(buffer); + + final int sviInt = bbv3.getInt(); + schemaIdentifier = SchemaIdentifier.builder().schemaVersionId((long) sviInt).protocol(protocolVersion).build(); + return schemaRegistry.retrieveSchema(schemaIdentifier); + + default: + throw new SchemaNotFoundException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version " + + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + protocolVersion + " or was not encoded with this data format"); + } } @Override diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java index cb4ed4e..99dbd1f 100644 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/schema/access/HortonworksEncodedSchemaReferenceWriter.java @@ -17,38 +17,57 @@ package org.apache.nifi.schema.access; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.Collections; import java.util.EnumSet; import java.util.Map; -import java.util.OptionalInt; -import java.util.OptionalLong; import java.util.Set; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWriter { private static final Set<SchemaField> requiredSchemaFields = EnumSet.of(SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); - private static final int LATEST_PROTOCOL_VERSION = 1; + private static final int LATEST_PROTOCOL_VERSION = 3; @Override public void writeHeader(final RecordSchema schema, final OutputStream out) throws IOException { final SchemaIdentifier identifier = schema.getIdentifier(); - final Long id = identifier.getIdentifier().getAsLong(); - final Integer version = identifier.getVersion().getAsInt(); - // This decoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer - // as it is provided at: - // https://github.com/hortonworks/registry/blob/master/schema-registry/serdes/src/main/java/com/hortonworks/registries/schemaregistry/serdes/avro/AvroSnapshotSerializer.java - final ByteBuffer bb = ByteBuffer.allocate(13); - bb.put((byte) LATEST_PROTOCOL_VERSION); - bb.putLong(id); - bb.putInt(version); + // This encoding follows the pattern that is provided for serializing data by the Hortonworks Schema Registry serializer + // See: https://registry-project.readthedocs.io/en/latest/serdes.html# + switch(identifier.getProtocol()) { + case 1: + final Long id = identifier.getIdentifier().getAsLong(); + final Integer version = identifier.getVersion().getAsInt(); + final ByteBuffer bbv1 = ByteBuffer.allocate(13); + bbv1.put((byte) 1); + bbv1.putLong(id); + bbv1.putInt(version); + out.write(bbv1.array()); + return; + case 2: + final Long sviV2 = identifier.getIdentifier().getAsLong(); + final ByteBuffer bbv2 = ByteBuffer.allocate(9); + bbv2.put((byte) 2); + bbv2.putLong(sviV2); + out.write(bbv2.array()); + return; + case 3: + final Long sviV3 = identifier.getIdentifier().getAsLong(); + final ByteBuffer bbv3 = ByteBuffer.allocate(5); + bbv3.put((byte) 3); + bbv3.putInt(sviV3.intValue()); + out.write(bbv3.array()); + return; + default: + throw new IOException("Schema Encoding appears to be of an incompatible version. The latest known Protocol is Version " + + LATEST_PROTOCOL_VERSION + " but the data was encoded with version " + identifier.getProtocol() + " or was not encoded with this data format"); + } + - out.write(bb.array()); } @Override @@ -59,14 +78,14 @@ public class HortonworksEncodedSchemaReferenceWriter implements SchemaAccessWrit @Override public void validateSchema(RecordSchema schema) throws SchemaNotFoundException { final SchemaIdentifier identifier = schema.getIdentifier(); - final OptionalLong identifierOption = identifier.getIdentifier(); - if (!identifierOption.isPresent()) { - throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known"); - } - final OptionalInt versionOption = identifier.getVersion(); - if (!versionOption.isPresent()) { - throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known"); + if(!identifier.getSchemaVersionId().isPresent()) { + if (!identifier.getIdentifier().isPresent()) { + throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Identifier is not known"); + } + if (!identifier.getVersion().isPresent()) { + throw new SchemaNotFoundException("Cannot write Encoded Schema Reference because the Schema Version is not known"); + } } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java index 0e42afc..c26c55a 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java @@ -17,6 +17,7 @@ package org.apache.nifi.schemaregistry.hortonworks; import com.google.common.collect.ImmutableMap; +import com.hortonworks.registries.schemaregistry.SchemaIdVersion; import com.hortonworks.registries.schemaregistry.SchemaMetadata; import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo; import com.hortonworks.registries.schemaregistry.SchemaVersionInfo; @@ -64,8 +65,13 @@ import java.util.concurrent.TimeUnit; @Tags({"schema", "registry", "avro", "hortonworks", "hwx"}) @CapabilityDescription("Provides a Schema Registry Service that interacts with a Hortonworks Schema Registry, available at https://github.com/hortonworks/registry") public class HortonworksSchemaRegistry extends AbstractControllerService implements SchemaRegistry { - private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_BRANCH_NAME, SchemaField.SCHEMA_TEXT, - SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION); + private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, + SchemaField.SCHEMA_BRANCH_NAME, + SchemaField.SCHEMA_TEXT, + SchemaField.SCHEMA_TEXT_FORMAT, + SchemaField.SCHEMA_IDENTIFIER, + SchemaField.SCHEMA_VERSION, + SchemaField.SCHEMA_VERSION_ID); private static final String CLIENT_SSL_PROPERTY_PREFIX = "schema.registry.client.ssl"; @@ -420,6 +426,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme .name(schemaName.get()) .branch(schemaBranchName.orElse(null)) .version(versionInfo.getVersion()) + .protocol(schemaIdentifier.getProtocol()) .build(); final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText); @@ -470,6 +477,7 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme .name(schemaName) .id(schemaId.getAsLong()) .version(version.getAsInt()) + .protocol(schemaIdentifier.getProtocol()) .build(); final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText); @@ -481,13 +489,49 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme @Override public RecordSchema retrieveSchema(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { - if (schemaIdentifier.getIdentifier().isPresent()) { + if (schemaIdentifier.getSchemaVersionId().isPresent()) { + return retrieveSchemaBySchemaVersionId(schemaIdentifier); + } else if (schemaIdentifier.getIdentifier().isPresent()) { return retrieveSchemaByIdAndVersion(schemaIdentifier); } else { return retrieveSchemaByName(schemaIdentifier); } } + private RecordSchema retrieveSchemaBySchemaVersionId(final SchemaIdentifier schemaIdentifier) throws IOException, org.apache.nifi.schema.access.SchemaNotFoundException { + final SchemaRegistryClient client = getClient(); + final OptionalLong schemaVersionId = schemaIdentifier.getSchemaVersionId(); + + final SchemaIdVersion svi = new SchemaIdVersion(schemaVersionId.getAsLong()); + + final String schemaName; + final SchemaVersionInfo versionInfo; + + try { + versionInfo = client.getSchemaVersionInfo(svi); + schemaName = versionInfo.getName(); + } catch (final Exception e) { + handleException("Failed to retrieve schema with Schema Version ID '" + schemaVersionId.getAsLong() + "'", e); + return null; + } + + final String schemaText = versionInfo.getSchemaText(); + + final SchemaIdentifier resultSchemaIdentifier = SchemaIdentifier.builder() + .name(schemaName) + .id(versionInfo.getSchemaMetadataId()) + .version(versionInfo.getVersion()) + .schemaVersionId(schemaVersionId.getAsLong()) + .protocol(schemaIdentifier.getProtocol()) + .build(); + + final Tuple<SchemaIdentifier, String> tuple = new Tuple<>(resultSchemaIdentifier, schemaText); + return schemaNameToSchemaMap.computeIfAbsent(tuple, t -> { + final Schema schema = new Schema.Parser().parse(schemaText); + return AvroTypeUtil.createSchema(schema, schemaText, resultSchemaIdentifier); + }); + } + private String createErrorMessage(final String baseMessage, final Optional<String> schemaName, final Optional<String> branchName, final OptionalInt version) { final StringBuilder builder = new StringBuilder(baseMessage) .append(" with name '")
