nictownsend commented on code in PR #24715: URL: https://github.com/apache/flink/pull/24715#discussion_r1618968305
########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioRegistryAvroFormatFactory.java: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Parser; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.BASIC_AUTH_CREDENTIALS_PASSWORD; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.BASIC_AUTH_CREDENTIALS_USERID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_CLIENT_ID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_CLIENT_SECRET; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_SCOPE; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_TOKEN_EXPIRATION_REDUCTION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_URL; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.PROPERTIES; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_DESCRIPTION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_ID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_NAME; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_VERSION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SCHEMA; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_KEYSTORE_LOCATION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_KEYSTORE_PASSWORD; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_TRUSTSTORE_LOCATION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_TRUSTSTORE_PASSWORD; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.URL; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; + +/** Apicurio avro format. */ +@Internal +public class ApicurioRegistryAvroFormatFactory + implements DeserializationFormatFactory, SerializationFormatFactory { + + // used this key rather than apicurio-avro (the format name) to be consistent with Confluent. + public static final String IDENTIFIER = "avro-apicurio"; + + @Override + public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + + String schemaRegistryURL = formatOptions.get(URL); + Optional<String> schemaString = formatOptions.getOptional(SCHEMA); + Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions); + return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() { + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + private List<String> metadataKeys; + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, + DataType producedDataType, + int[][] projections) { + producedDataType = Projection.of(projections).project(producedDataType); + final RowType rowType = (RowType) producedDataType.getLogicalType(); + // schema has been supplied + final Schema schema = + schemaString + .map(s -> getAvroSchema(s, rowType)) + .orElse(AvroSchemaConverter.convertToSchema(rowType)); + final TypeInformation<RowData> rowDataTypeInfo = + context.createTypeInformation(producedDataType); + return new AvroRowDataDeserializationSchema( + ApicurioRegistryAvroDeserializationSchema.forGeneric( + schema, schemaRegistryURL, optionalPropertiesMap), + AvroToRowDataConverters.createRowConverter(rowType), + rowDataTypeInfo); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + }; + } + + @Override + public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + + String schemaRegistryURL = formatOptions.get(URL); + Optional<String> schemaString = formatOptions.getOptional(SCHEMA); + Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions); + + return new EncodingFormat<SerializationSchema<RowData>>() { + @Override + public SerializationSchema<RowData> createRuntimeEncoder( + DynamicTableSink.Context context, DataType consumedDataType) { + final RowType rowType = (RowType) consumedDataType.getLogicalType(); + final Schema schema = + schemaString + .map(s -> getAvroSchema(s, rowType)) + .orElse(AvroSchemaConverter.convertToSchema(rowType)); + return new AvroRowDataSerializationSchema( + rowType, + ApicurioRegistryAvroSerializationSchema.forGeneric( + schema, schemaRegistryURL, optionalPropertiesMap), + RowDataToAvroConverters.createConverter(rowType)); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + }; + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(URL); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return getOptionalOptions(); + } + + protected static Set<ConfigOption<?>> getOptionalOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(USE_HEADERS); + options.add(USE_GLOBALID); + options.add(SCHEMA); + options.add(REGISTERED_ARTIFACT_NAME); + options.add(REGISTERED_ARTIFACT_DESCRIPTION); + options.add(REGISTERED_ARTIFACT_ID); + options.add(REGISTERED_ARTIFACT_VERSION); + options.add(PROPERTIES); + options.add(SSL_KEYSTORE_LOCATION); + options.add(SSL_KEYSTORE_PASSWORD); + options.add(SSL_TRUSTSTORE_LOCATION); + options.add(SSL_TRUSTSTORE_PASSWORD); + options.add(BASIC_AUTH_CREDENTIALS_USERID); + options.add(BASIC_AUTH_CREDENTIALS_PASSWORD); + options.add(OIDC_AUTH_URL); + options.add(OIDC_AUTH_CLIENT_ID); + options.add(OIDC_AUTH_CLIENT_SECRET); + options.add(OIDC_AUTH_SCOPE); + options.add(OIDC_AUTH_TOKEN_EXPIRATION_REDUCTION); + + return options; + } + + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of( + URL, + USE_HEADERS, + USE_GLOBALID, + SCHEMA, + REGISTERED_ARTIFACT_NAME, + REGISTERED_ARTIFACT_DESCRIPTION, + REGISTERED_ARTIFACT_ID, + REGISTERED_ARTIFACT_VERSION, + PROPERTIES, + SSL_KEYSTORE_LOCATION, + SSL_KEYSTORE_PASSWORD, + SSL_TRUSTSTORE_LOCATION, + SSL_TRUSTSTORE_PASSWORD, + BASIC_AUTH_CREDENTIALS_USERID, + BASIC_AUTH_CREDENTIALS_PASSWORD, + OIDC_AUTH_URL, + OIDC_AUTH_CLIENT_ID, + OIDC_AUTH_CLIENT_SECRET, + OIDC_AUTH_SCOPE, + OIDC_AUTH_TOKEN_EXPIRATION_REDUCTION) + .collect(Collectors.toSet()); + } + + public static @Nullable Map<String, Object> buildOptionalPropertiesMap( + ReadableConfig formatOptions) { + final Map<String, Object> properties = new HashMap<>(); + + Set<ConfigOption<?>> configOptionsArray = getOptionalOptions(); + + for (ConfigOption configOption : configOptionsArray) { + formatOptions + .getOptional(configOption) + .ifPresent(v -> properties.put(configOption.key(), v)); + // when there is no value we would like to set the default as the value. + // at java 11 we can use the orElse or isEmpty , but we are java 8 + String configOptionKey = configOption.key(); + if (properties.get(configOptionKey) == null && configOption.hasDefaultValue()) { + properties.put(configOptionKey, configOption.defaultValue()); + } + } + return properties; + } + + private static Schema getAvroSchema(String schemaString, RowType rowType) { + LogicalType convertedDataType = + AvroSchemaConverter.convertToDataType(schemaString).getLogicalType(); + + if (convertedDataType.isNullable()) { + convertedDataType = convertedDataType.copy(false); + } + + if (!convertedDataType.equals(rowType)) { + throw new IllegalArgumentException( + format( + "Schema provided for '%s' format does not match the table schema: %s", + IDENTIFIER, schemaString)); + } + + return new Parser().parse(schemaString); Review Comment: Can this throw errors? I'd expect a parse error or equivalent checked exception? ########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioSchemaRegistryCoder.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.formats.avro.AvroFormatOptions; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; +import io.apicurio.registry.rest.v2.beans.IfExists; +import io.apicurio.registry.rest.v2.beans.VersionMetaData; +import io.apicurio.registry.types.ArtifactType; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; + +/** Reads and Writes schema using Avro Schema Registry protocol. */ +public class ApicurioSchemaRegistryCoder implements SchemaCoder { + + public static final String APICURIO_KEY_GLOBAL_ID_HEADER = "apicurio.key.globalId"; + + public static final String APICURIO_KEY_CONTENT_ID_HEADER = "apicurio.key.contentId"; + + public static final String APICURIO_VALUE_GLOBAL_ID_HEADER = "apicurio.value.globalId"; + + public static final String APICURIO_VALUE_CONTENT_ID_HEADER = "apicurio.value.contentId"; + + public static final String APICURIO_VALUE_ENCODING = "apicurio.value.encoding"; + + private final RegistryClient registryClient; + + private final Map<String, Object> configs; + + private static final int MAGIC_BYTE = 0; + + public static final String TOPIC_NAME = "TOPIC_NAME"; + + public static final String HEADERS = "HEADERS"; + + public static final String IS_KEY = "IS_KEY"; + + private static final Logger LOG = LoggerFactory.getLogger(ApicurioSchemaRegistryCoder.class); + + /** + * Creates {@link SchemaCoder} that uses provided {@link RegistryClient} to connect to schema + * registry. + * + * @param registryClient client to connect schema registry + * @param configs map for registry configs + */ + public ApicurioSchemaRegistryCoder(RegistryClient registryClient, Map<String, Object> configs) { + this.registryClient = registryClient; + this.configs = configs; + } + + @Override + public Schema readSchema(InputStream in) throws IOException { + throw new IOException( + "Incompatible Kafka connector. Use a Kafka connector that passes headers"); + } + + public long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip(); // need flip + return buffer.getLong(); + } + + public static byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + /** + * Get the Avro schema using the Apicurio Registry. In order to call the registry, we need to + * get the globalId of the Avro Schema in the registry. If the format is configured to expect + * headers, then the globalId will be obtained from the headers map. Otherwise, the globalId is + * obtained from the message payload, depending on the format configuration. This format is only + * used with the Kafka connector. The Kafka connector lives in another repository. This + * repository does not have any Kafka dependencies; it is therefore not possible for us to use + * the Kafka consumer record here passing a deserialization handler. So we have to get the + * information we need directly from the headers or the payload itself. + * + * @param in input stream + * @param additionalParameters map of additional parameters - including the headers an whether + * this is a key or value + * @return the Avro Schema + * @throws IOException if there is an error + */ + @Override + public Schema readSchemaWithAdditionalParameters( + InputStream in, Map<String, Object> additionalParameters) throws IOException { + String methodName = "readSchemaWithAdditionalParameters"; + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " entered"); + } + if (in == null) { + return null; + } + boolean useGlobalId = (boolean) configs.get(USE_GLOBALID.key()); + + long schemaId = getSchemaId(in, additionalParameters, useGlobalId); + + if (!useGlobalId) { + // if we are using the content ID there is not get that will dereference. + // so we need to get the artifact references, find the latest version, + // using its groupId and artifactId we can get the globalId, from which we can get + // a de-referenced schema + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " got using contentId " + schemaId); + } + List<ArtifactReference> artifactReferenceList = + registryClient.getArtifactReferencesByContentId(schemaId); Review Comment: Is there an apicurio version check needed here? Is this an API that only exists on some apicurio versions? ########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioSchemaRegistryCoder.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.formats.avro.AvroFormatOptions; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; +import io.apicurio.registry.rest.v2.beans.IfExists; +import io.apicurio.registry.rest.v2.beans.VersionMetaData; +import io.apicurio.registry.types.ArtifactType; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; + +/** Reads and Writes schema using Avro Schema Registry protocol. */ +public class ApicurioSchemaRegistryCoder implements SchemaCoder { + + public static final String APICURIO_KEY_GLOBAL_ID_HEADER = "apicurio.key.globalId"; + + public static final String APICURIO_KEY_CONTENT_ID_HEADER = "apicurio.key.contentId"; + + public static final String APICURIO_VALUE_GLOBAL_ID_HEADER = "apicurio.value.globalId"; + + public static final String APICURIO_VALUE_CONTENT_ID_HEADER = "apicurio.value.contentId"; + + public static final String APICURIO_VALUE_ENCODING = "apicurio.value.encoding"; + + private final RegistryClient registryClient; + + private final Map<String, Object> configs; + + private static final int MAGIC_BYTE = 0; + + public static final String TOPIC_NAME = "TOPIC_NAME"; + + public static final String HEADERS = "HEADERS"; + + public static final String IS_KEY = "IS_KEY"; + + private static final Logger LOG = LoggerFactory.getLogger(ApicurioSchemaRegistryCoder.class); + + /** + * Creates {@link SchemaCoder} that uses provided {@link RegistryClient} to connect to schema + * registry. + * + * @param registryClient client to connect schema registry + * @param configs map for registry configs + */ + public ApicurioSchemaRegistryCoder(RegistryClient registryClient, Map<String, Object> configs) { + this.registryClient = registryClient; + this.configs = configs; + } + + @Override + public Schema readSchema(InputStream in) throws IOException { + throw new IOException( + "Incompatible Kafka connector. Use a Kafka connector that passes headers"); + } + + public long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip(); // need flip + return buffer.getLong(); + } + + public static byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + /** + * Get the Avro schema using the Apicurio Registry. In order to call the registry, we need to + * get the globalId of the Avro Schema in the registry. If the format is configured to expect + * headers, then the globalId will be obtained from the headers map. Otherwise, the globalId is + * obtained from the message payload, depending on the format configuration. This format is only + * used with the Kafka connector. The Kafka connector lives in another repository. This + * repository does not have any Kafka dependencies; it is therefore not possible for us to use + * the Kafka consumer record here passing a deserialization handler. So we have to get the + * information we need directly from the headers or the payload itself. + * + * @param in input stream + * @param additionalParameters map of additional parameters - including the headers an whether + * this is a key or value + * @return the Avro Schema + * @throws IOException if there is an error + */ + @Override + public Schema readSchemaWithAdditionalParameters( + InputStream in, Map<String, Object> additionalParameters) throws IOException { + String methodName = "readSchemaWithAdditionalParameters"; + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " entered"); + } + if (in == null) { + return null; + } + boolean useGlobalId = (boolean) configs.get(USE_GLOBALID.key()); + + long schemaId = getSchemaId(in, additionalParameters, useGlobalId); + + if (!useGlobalId) { + // if we are using the content ID there is not get that will dereference. + // so we need to get the artifact references, find the latest version, + // using its groupId and artifactId we can get the globalId, from which we can get + // a de-referenced schema + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " got using contentId " + schemaId); + } + List<ArtifactReference> artifactReferenceList = + registryClient.getArtifactReferencesByContentId(schemaId); + if (artifactReferenceList == null || artifactReferenceList.isEmpty()) { + throw new IOException( + "Could not find any schemas in the Apicurio Registry with content ID " + + schemaId); + } + long latestVersion = 0L; + String artifactId = ""; + String groupId = ""; + for (ArtifactReference artifactReference : artifactReferenceList) { + long version = Long.valueOf(artifactReference.getVersion()); + // TODO check Apicurio code to see how it checks for the latest version + if (version > latestVersion) { + latestVersion = version; + artifactId = artifactReference.getArtifactId(); + groupId = artifactReference.getGroupId(); + } + } + VersionMetaData artifactVersion = + registryClient.getArtifactVersionMetaData( + groupId, artifactId, latestVersion + ""); + schemaId = artifactVersion.getGlobalId(); + } + // get the schema in canonical form with references dereferenced + InputStream schemaInputStream = registryClient.getContentByGlobalId(schemaId, true, true); + Schema schema = new Schema.Parser().parse(schemaInputStream); + if (LOG.isDebugEnabled()) { + LOG.debug( + methodName + + " got schema " + + schema + + " using " + + (useGlobalId ? "global" : "content") + + " ID " + + schemaId); + } + return schema; + } + + /** + * Get the schema id. + * + * @param in the Kafka body as an input stream + * @param additionalParameters additional Pararamters containing the headers and whetehr we are + * processing a key or value + * @param useGlobalID hen true globalIDs are used, otherwise contentIDS + * @return the id of the schema + * @throws IOException error occurred + */ + protected long getSchemaId( + InputStream in, Map<String, Object> additionalParameters, boolean useGlobalID) Review Comment: Rather than passing in `additionalParameters`, is it cleaner to add a few parameters for `isKey`, `useHeaders`, and `Map<String,Object> headers` - so that this method doesn't have to know how to retrieve them from the nested map? ########## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/RegistryAvroSerializationSchema.java: ########## @@ -107,17 +108,37 @@ public static RegistryAvroSerializationSchema<GenericRecord> forGeneric( } @Override - public byte[] serialize(T object) { - checkAvroInitialized(); + public byte[] serialize(T element) { + return serializeWithAdditionalProperties(element, null, null); + } + @Override + public byte[] serializeWithAdditionalProperties( + T object, + Map<String, Object> inputAdditionalProperties, + Map<String, Object> outputAdditionalProperties) { + checkAvroInitialized(); if (object == null) { return null; } else { try { ByteArrayOutputStream outputStream = getOutputStream(); outputStream.reset(); Encoder encoder = getEncoder(); - schemaCoder.writeSchema(getSchema(), outputStream); + if (inputAdditionalProperties == null) { Review Comment: Why are we failing here? Why are we not checking that outputProperties is not null? ########## flink-formats/flink-avro-apicurio-registry/src/test/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioSchemaRegistryCodergetSchemaTest.java: ########## @@ -0,0 +1,363 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.configuration.ConfigOption; + +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList; + +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for schemacoder. */ +public class ApicurioSchemaRegistryCodergetSchemaTest { + + @ParameterizedTest + @MethodSource("configProvider") + public void getSchemaId(TestSpec testSpec) { + try { + + if (testSpec.extraConfigOptions == null) { + testSpec.extraConfigOptions = new HashMap<>(); + testSpec.extraConfigOptions.put(USE_GLOBALID.key(), true); + testSpec.extraConfigOptions.put(USE_HEADERS.key(), true); + } + + // get options map + Map<String, Object> registryConfigs = + getConfig(testSpec.isKey, testSpec.extraConfigOptions); + // this is just to have something in the stream + String str = readFile("src/test/resources/simple1.avro", StandardCharsets.UTF_8); + + byte[] schemaBytes = str.getBytes(StandardCharsets.UTF_8); + InputStream in; + if (testSpec.byteArray == null) { + in = new ByteArrayInputStream(schemaBytes); + } else { + // the byteArray is the magic byte and schema id that should be at the start of the + // body + byte[] combined = new byte[testSpec.byteArray.length + schemaBytes.length]; + ByteBuffer byteBuffer = ByteBuffer.wrap(combined); + byteBuffer.put(testSpec.byteArray); + byteBuffer.put(schemaBytes); + combined = byteBuffer.array(); + in = new ByteArrayInputStream(combined); + } + + ApicurioSchemaRegistryCoder apicurioSchemaRegistryCoder = Review Comment: Worth some tests that provide an empty map for `registryConfigs` to tease out the null pointers mentioned above. I appreciate that the config itself is built with defaults, but I don't know if it's safe to assume that someone will only call the constructor from the default config? ########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioSchemaRegistryCoder.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.formats.avro.AvroFormatOptions; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; +import io.apicurio.registry.rest.v2.beans.IfExists; +import io.apicurio.registry.rest.v2.beans.VersionMetaData; +import io.apicurio.registry.types.ArtifactType; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; + +/** Reads and Writes schema using Avro Schema Registry protocol. */ +public class ApicurioSchemaRegistryCoder implements SchemaCoder { + + public static final String APICURIO_KEY_GLOBAL_ID_HEADER = "apicurio.key.globalId"; + + public static final String APICURIO_KEY_CONTENT_ID_HEADER = "apicurio.key.contentId"; + + public static final String APICURIO_VALUE_GLOBAL_ID_HEADER = "apicurio.value.globalId"; + + public static final String APICURIO_VALUE_CONTENT_ID_HEADER = "apicurio.value.contentId"; + + public static final String APICURIO_VALUE_ENCODING = "apicurio.value.encoding"; + + private final RegistryClient registryClient; + + private final Map<String, Object> configs; + + private static final int MAGIC_BYTE = 0; + + public static final String TOPIC_NAME = "TOPIC_NAME"; + + public static final String HEADERS = "HEADERS"; + + public static final String IS_KEY = "IS_KEY"; + + private static final Logger LOG = LoggerFactory.getLogger(ApicurioSchemaRegistryCoder.class); + + /** + * Creates {@link SchemaCoder} that uses provided {@link RegistryClient} to connect to schema + * registry. + * + * @param registryClient client to connect schema registry + * @param configs map for registry configs + */ + public ApicurioSchemaRegistryCoder(RegistryClient registryClient, Map<String, Object> configs) { + this.registryClient = registryClient; + this.configs = configs; + } + + @Override + public Schema readSchema(InputStream in) throws IOException { + throw new IOException( + "Incompatible Kafka connector. Use a Kafka connector that passes headers"); + } + + public long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip(); // need flip + return buffer.getLong(); + } + + public static byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + /** + * Get the Avro schema using the Apicurio Registry. In order to call the registry, we need to + * get the globalId of the Avro Schema in the registry. If the format is configured to expect + * headers, then the globalId will be obtained from the headers map. Otherwise, the globalId is + * obtained from the message payload, depending on the format configuration. This format is only + * used with the Kafka connector. The Kafka connector lives in another repository. This + * repository does not have any Kafka dependencies; it is therefore not possible for us to use + * the Kafka consumer record here passing a deserialization handler. So we have to get the + * information we need directly from the headers or the payload itself. + * + * @param in input stream + * @param additionalParameters map of additional parameters - including the headers an whether + * this is a key or value + * @return the Avro Schema + * @throws IOException if there is an error + */ + @Override + public Schema readSchemaWithAdditionalParameters( + InputStream in, Map<String, Object> additionalParameters) throws IOException { + String methodName = "readSchemaWithAdditionalParameters"; + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " entered"); + } + if (in == null) { + return null; + } + boolean useGlobalId = (boolean) configs.get(USE_GLOBALID.key()); + + long schemaId = getSchemaId(in, additionalParameters, useGlobalId); + + if (!useGlobalId) { + // if we are using the content ID there is not get that will dereference. + // so we need to get the artifact references, find the latest version, + // using its groupId and artifactId we can get the globalId, from which we can get + // a de-referenced schema + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " got using contentId " + schemaId); + } + List<ArtifactReference> artifactReferenceList = + registryClient.getArtifactReferencesByContentId(schemaId); + if (artifactReferenceList == null || artifactReferenceList.isEmpty()) { + throw new IOException( + "Could not find any schemas in the Apicurio Registry with content ID " + + schemaId); + } + long latestVersion = 0L; + String artifactId = ""; + String groupId = ""; + for (ArtifactReference artifactReference : artifactReferenceList) { + long version = Long.valueOf(artifactReference.getVersion()); + // TODO check Apicurio code to see how it checks for the latest version + if (version > latestVersion) { + latestVersion = version; + artifactId = artifactReference.getArtifactId(); + groupId = artifactReference.getGroupId(); + } + } + VersionMetaData artifactVersion = + registryClient.getArtifactVersionMetaData( + groupId, artifactId, latestVersion + ""); + schemaId = artifactVersion.getGlobalId(); + } + // get the schema in canonical form with references dereferenced + InputStream schemaInputStream = registryClient.getContentByGlobalId(schemaId, true, true); + Schema schema = new Schema.Parser().parse(schemaInputStream); + if (LOG.isDebugEnabled()) { + LOG.debug( + methodName + + " got schema " + + schema + + " using " + + (useGlobalId ? "global" : "content") + + " ID " + + schemaId); + } + return schema; + } + + /** + * Get the schema id. + * + * @param in the Kafka body as an input stream + * @param additionalParameters additional Pararamters containing the headers and whetehr we are + * processing a key or value + * @param useGlobalID hen true globalIDs are used, otherwise contentIDS + * @return the id of the schema + * @throws IOException error occurred + */ + protected long getSchemaId( + InputStream in, Map<String, Object> additionalParameters, boolean useGlobalID) + throws IOException { + long schemaId = 0L; + Map<String, Object> headers = (Map<String, Object>) additionalParameters.get(HEADERS); Review Comment: Same issue here re null pointers on casting ########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/AvroApicurioFormatOptions.java: ########## @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import io.apicurio.registry.serde.SerdeConfig; + +import java.time.Duration; +import java.util.Map; + +/** Options for Schema Registry Avro format. */ +@PublicEvolving +public class AvroApicurioFormatOptions { + + public static final ConfigOption<String> URL = + ConfigOptions.key("url") + .stringType() + .noDefaultValue() + .withDescription("The URL of the Apicurio Registry to fetch/register schemas."); + + public static final ConfigOption<Boolean> USE_HEADERS = + ConfigOptions.key("use-headers") + .booleanType() + .defaultValue(true) + .withDescription( + "Used by serializers and deserializers. Configures the identifier for artifacts. \n" + + "true means use global ID, false means use content ID.\n" + + "Instructs the serializer to write the specified ID to Kafka,\n" + + "and instructs the deserializer to use this ID to find the schema."); + + public static final ConfigOption<Boolean> USE_GLOBALID = + ConfigOptions.key("use-globalid") + .booleanType() + .defaultValue(true) + .withDescription( + "Configures to read/write the artifact identifier to " + + "Kafka message headers instead of in the message payload."); + + public static final ConfigOption<String> GROUP_ID = + ConfigOptions.key("groupId") + .stringType() + .defaultValue("default") + .withDescription("GroupId Used by deserializers."); + + public static final ConfigOption<String> SCHEMA = + ConfigOptions.key("schema") + .stringType() + .noDefaultValue() + .withDescription( + "The schema registered or to be registered in the Apicurio Registry. " + + "If no schema is provided Flink converts the table schema to avro schema. " + + "The schema provided must match the table schema ('avro-apicurio')."); Review Comment: https://github.com/apache/flink/pull/24715/files#diff-d8f33c08f59e3f895d0fa092cb551c8e3378f3a20d19c4845384377909869057R268 - is doing an equality check, not a compatibility check. Which currently reads like: - If you do not provide a schema, the writer schema is generated from the table - If you do provide a schema, it must match the row type (which makes the ability to provide a schema moot) If this is for providing the writer schema, I would expect the `.equals()` to be a compatibility check instead. ########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioSchemaRegistryCoder.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.formats.avro.AvroFormatOptions; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; +import io.apicurio.registry.rest.v2.beans.IfExists; +import io.apicurio.registry.rest.v2.beans.VersionMetaData; +import io.apicurio.registry.types.ArtifactType; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; + +/** Reads and Writes schema using Avro Schema Registry protocol. */ +public class ApicurioSchemaRegistryCoder implements SchemaCoder { + + public static final String APICURIO_KEY_GLOBAL_ID_HEADER = "apicurio.key.globalId"; + + public static final String APICURIO_KEY_CONTENT_ID_HEADER = "apicurio.key.contentId"; + + public static final String APICURIO_VALUE_GLOBAL_ID_HEADER = "apicurio.value.globalId"; + + public static final String APICURIO_VALUE_CONTENT_ID_HEADER = "apicurio.value.contentId"; + + public static final String APICURIO_VALUE_ENCODING = "apicurio.value.encoding"; + + private final RegistryClient registryClient; + + private final Map<String, Object> configs; + + private static final int MAGIC_BYTE = 0; + + public static final String TOPIC_NAME = "TOPIC_NAME"; + + public static final String HEADERS = "HEADERS"; + + public static final String IS_KEY = "IS_KEY"; + + private static final Logger LOG = LoggerFactory.getLogger(ApicurioSchemaRegistryCoder.class); + + /** + * Creates {@link SchemaCoder} that uses provided {@link RegistryClient} to connect to schema + * registry. + * + * @param registryClient client to connect schema registry + * @param configs map for registry configs + */ + public ApicurioSchemaRegistryCoder(RegistryClient registryClient, Map<String, Object> configs) { + this.registryClient = registryClient; + this.configs = configs; + } + + @Override + public Schema readSchema(InputStream in) throws IOException { + throw new IOException( + "Incompatible Kafka connector. Use a Kafka connector that passes headers"); + } + + public long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip(); // need flip + return buffer.getLong(); + } + + public static byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + /** + * Get the Avro schema using the Apicurio Registry. In order to call the registry, we need to + * get the globalId of the Avro Schema in the registry. If the format is configured to expect + * headers, then the globalId will be obtained from the headers map. Otherwise, the globalId is + * obtained from the message payload, depending on the format configuration. This format is only + * used with the Kafka connector. The Kafka connector lives in another repository. This + * repository does not have any Kafka dependencies; it is therefore not possible for us to use + * the Kafka consumer record here passing a deserialization handler. So we have to get the + * information we need directly from the headers or the payload itself. + * + * @param in input stream + * @param additionalParameters map of additional parameters - including the headers an whether + * this is a key or value + * @return the Avro Schema + * @throws IOException if there is an error + */ + @Override + public Schema readSchemaWithAdditionalParameters( + InputStream in, Map<String, Object> additionalParameters) throws IOException { + String methodName = "readSchemaWithAdditionalParameters"; + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " entered"); + } + if (in == null) { + return null; + } + boolean useGlobalId = (boolean) configs.get(USE_GLOBALID.key()); + + long schemaId = getSchemaId(in, additionalParameters, useGlobalId); + + if (!useGlobalId) { + // if we are using the content ID there is not get that will dereference. + // so we need to get the artifact references, find the latest version, + // using its groupId and artifactId we can get the globalId, from which we can get + // a de-referenced schema + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " got using contentId " + schemaId); + } + List<ArtifactReference> artifactReferenceList = + registryClient.getArtifactReferencesByContentId(schemaId); + if (artifactReferenceList == null || artifactReferenceList.isEmpty()) { + throw new IOException( + "Could not find any schemas in the Apicurio Registry with content ID " + + schemaId); + } + long latestVersion = 0L; + String artifactId = ""; + String groupId = ""; + for (ArtifactReference artifactReference : artifactReferenceList) { + long version = Long.valueOf(artifactReference.getVersion()); + // TODO check Apicurio code to see how it checks for the latest version + if (version > latestVersion) { + latestVersion = version; + artifactId = artifactReference.getArtifactId(); + groupId = artifactReference.getGroupId(); + } + } + VersionMetaData artifactVersion = + registryClient.getArtifactVersionMetaData( + groupId, artifactId, latestVersion + ""); + schemaId = artifactVersion.getGlobalId(); + } + // get the schema in canonical form with references dereferenced + InputStream schemaInputStream = registryClient.getContentByGlobalId(schemaId, true, true); + Schema schema = new Schema.Parser().parse(schemaInputStream); + if (LOG.isDebugEnabled()) { + LOG.debug( + methodName + + " got schema " + + schema + + " using " + + (useGlobalId ? "global" : "content") + + " ID " + + schemaId); + } + return schema; + } + + /** + * Get the schema id. + * + * @param in the Kafka body as an input stream + * @param additionalParameters additional Pararamters containing the headers and whetehr we are + * processing a key or value + * @param useGlobalID hen true globalIDs are used, otherwise contentIDS + * @return the id of the schema + * @throws IOException error occurred + */ + protected long getSchemaId( + InputStream in, Map<String, Object> additionalParameters, boolean useGlobalID) + throws IOException { + long schemaId = 0L; + Map<String, Object> headers = (Map<String, Object>) additionalParameters.get(HEADERS); + boolean isKey = (boolean) additionalParameters.get(IS_KEY); + boolean useHeaders = (boolean) configs.get(USE_HEADERS.key()); Review Comment: null pointer if `.get()` returns null ########## flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java: ########## @@ -94,6 +95,31 @@ default void deserialize(byte[] message, Collector<T> out) throws IOException { */ boolean isEndOfStream(T nextElement); + /** + * Deserializes the byte message with input Additional Properties. + * + * @param message The message, as a byte array. + * @param inputAdditionalProperties inputAdditionalProperties map of input Additional Properties + * that can be used for deserialization. Override this method to make use of the + * inputAdditionalProperties, + * @return The deserialized message as an object (null if the message cannot be deserialized). + */ + @PublicEvolving + default T deserializeWithAdditionalProperties( + byte[] message, Map<String, Object> inputAdditionalProperties) throws IOException { + return deserialize(message); + } + + @PublicEvolving + default void deserializeWithAdditionalProperties( + byte[] message, Map<String, Object> inputAdditionalProperties, Collector<T> out) + throws IOException { + T deserialize = deserializeWithAdditionalProperties(message, inputAdditionalProperties); Review Comment: Why does this default method not just call `deserialize(byte[] message, Collector<T> out)`? Is it a future proof to make sure that if `deserializeWithAdditionalProperties` default changes, we pick it up here? ########## flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java: ########## @@ -34,6 +35,20 @@ public interface SchemaCoder { void writeSchema(Schema schema, OutputStream out) throws IOException; + default void writeSchema( + Schema schema, + OutputStream out, + Map<String, Object> inputProperties, + Map<String, Object> outputProperties) + throws IOException { + throw new RuntimeException("writeSchema passing headers should be overridden."); Review Comment: why is this not calling `writeSchema(schema, out)` like you do with `readSchemaWithAdditionalParameters`? ########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioSchemaRegistryCoder.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.formats.avro.AvroFormatOptions; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; +import io.apicurio.registry.rest.v2.beans.IfExists; +import io.apicurio.registry.rest.v2.beans.VersionMetaData; +import io.apicurio.registry.types.ArtifactType; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; + +/** Reads and Writes schema using Avro Schema Registry protocol. */ +public class ApicurioSchemaRegistryCoder implements SchemaCoder { + + public static final String APICURIO_KEY_GLOBAL_ID_HEADER = "apicurio.key.globalId"; + + public static final String APICURIO_KEY_CONTENT_ID_HEADER = "apicurio.key.contentId"; + + public static final String APICURIO_VALUE_GLOBAL_ID_HEADER = "apicurio.value.globalId"; + + public static final String APICURIO_VALUE_CONTENT_ID_HEADER = "apicurio.value.contentId"; + + public static final String APICURIO_VALUE_ENCODING = "apicurio.value.encoding"; + + private final RegistryClient registryClient; + + private final Map<String, Object> configs; + + private static final int MAGIC_BYTE = 0; + + public static final String TOPIC_NAME = "TOPIC_NAME"; + + public static final String HEADERS = "HEADERS"; + + public static final String IS_KEY = "IS_KEY"; + + private static final Logger LOG = LoggerFactory.getLogger(ApicurioSchemaRegistryCoder.class); + + /** + * Creates {@link SchemaCoder} that uses provided {@link RegistryClient} to connect to schema + * registry. + * + * @param registryClient client to connect schema registry + * @param configs map for registry configs + */ + public ApicurioSchemaRegistryCoder(RegistryClient registryClient, Map<String, Object> configs) { + this.registryClient = registryClient; + this.configs = configs; + } + + @Override + public Schema readSchema(InputStream in) throws IOException { + throw new IOException( + "Incompatible Kafka connector. Use a Kafka connector that passes headers"); + } + + public long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip(); // need flip + return buffer.getLong(); + } + + public static byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + /** + * Get the Avro schema using the Apicurio Registry. In order to call the registry, we need to + * get the globalId of the Avro Schema in the registry. If the format is configured to expect + * headers, then the globalId will be obtained from the headers map. Otherwise, the globalId is + * obtained from the message payload, depending on the format configuration. This format is only + * used with the Kafka connector. The Kafka connector lives in another repository. This + * repository does not have any Kafka dependencies; it is therefore not possible for us to use + * the Kafka consumer record here passing a deserialization handler. So we have to get the + * information we need directly from the headers or the payload itself. + * + * @param in input stream + * @param additionalParameters map of additional parameters - including the headers an whether + * this is a key or value + * @return the Avro Schema + * @throws IOException if there is an error + */ + @Override + public Schema readSchemaWithAdditionalParameters( + InputStream in, Map<String, Object> additionalParameters) throws IOException { + String methodName = "readSchemaWithAdditionalParameters"; + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " entered"); + } + if (in == null) { + return null; + } + boolean useGlobalId = (boolean) configs.get(USE_GLOBALID.key()); + + long schemaId = getSchemaId(in, additionalParameters, useGlobalId); + + if (!useGlobalId) { + // if we are using the content ID there is not get that will dereference. + // so we need to get the artifact references, find the latest version, + // using its groupId and artifactId we can get the globalId, from which we can get + // a de-referenced schema + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " got using contentId " + schemaId); + } + List<ArtifactReference> artifactReferenceList = + registryClient.getArtifactReferencesByContentId(schemaId); + if (artifactReferenceList == null || artifactReferenceList.isEmpty()) { + throw new IOException( + "Could not find any schemas in the Apicurio Registry with content ID " + + schemaId); + } + long latestVersion = 0L; + String artifactId = ""; + String groupId = ""; + for (ArtifactReference artifactReference : artifactReferenceList) { + long version = Long.valueOf(artifactReference.getVersion()); + // TODO check Apicurio code to see how it checks for the latest version + if (version > latestVersion) { + latestVersion = version; + artifactId = artifactReference.getArtifactId(); + groupId = artifactReference.getGroupId(); + } + } + VersionMetaData artifactVersion = + registryClient.getArtifactVersionMetaData( + groupId, artifactId, latestVersion + ""); + schemaId = artifactVersion.getGlobalId(); + } + // get the schema in canonical form with references dereferenced + InputStream schemaInputStream = registryClient.getContentByGlobalId(schemaId, true, true); + Schema schema = new Schema.Parser().parse(schemaInputStream); + if (LOG.isDebugEnabled()) { + LOG.debug( + methodName + + " got schema " + + schema + + " using " + + (useGlobalId ? "global" : "content") + + " ID " + + schemaId); + } + return schema; + } + + /** + * Get the schema id. + * + * @param in the Kafka body as an input stream + * @param additionalParameters additional Pararamters containing the headers and whetehr we are + * processing a key or value + * @param useGlobalID hen true globalIDs are used, otherwise contentIDS + * @return the id of the schema + * @throws IOException error occurred + */ + protected long getSchemaId( + InputStream in, Map<String, Object> additionalParameters, boolean useGlobalID) + throws IOException { + long schemaId = 0L; + Map<String, Object> headers = (Map<String, Object>) additionalParameters.get(HEADERS); Review Comment: Why cast to `<String, Object>` when everything in the method accesses it as `<String, byte[]>` with extra casting? ########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioSchemaRegistryCoder.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.formats.avro.AvroFormatOptions; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; +import io.apicurio.registry.rest.v2.beans.IfExists; +import io.apicurio.registry.rest.v2.beans.VersionMetaData; +import io.apicurio.registry.types.ArtifactType; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; + +/** Reads and Writes schema using Avro Schema Registry protocol. */ +public class ApicurioSchemaRegistryCoder implements SchemaCoder { + + public static final String APICURIO_KEY_GLOBAL_ID_HEADER = "apicurio.key.globalId"; + + public static final String APICURIO_KEY_CONTENT_ID_HEADER = "apicurio.key.contentId"; + + public static final String APICURIO_VALUE_GLOBAL_ID_HEADER = "apicurio.value.globalId"; + + public static final String APICURIO_VALUE_CONTENT_ID_HEADER = "apicurio.value.contentId"; + + public static final String APICURIO_VALUE_ENCODING = "apicurio.value.encoding"; + + private final RegistryClient registryClient; + + private final Map<String, Object> configs; + + private static final int MAGIC_BYTE = 0; + + public static final String TOPIC_NAME = "TOPIC_NAME"; + + public static final String HEADERS = "HEADERS"; + + public static final String IS_KEY = "IS_KEY"; + + private static final Logger LOG = LoggerFactory.getLogger(ApicurioSchemaRegistryCoder.class); + + /** + * Creates {@link SchemaCoder} that uses provided {@link RegistryClient} to connect to schema + * registry. + * + * @param registryClient client to connect schema registry + * @param configs map for registry configs + */ + public ApicurioSchemaRegistryCoder(RegistryClient registryClient, Map<String, Object> configs) { + this.registryClient = registryClient; + this.configs = configs; + } + + @Override + public Schema readSchema(InputStream in) throws IOException { + throw new IOException( + "Incompatible Kafka connector. Use a Kafka connector that passes headers"); + } + + public long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip(); // need flip + return buffer.getLong(); + } + + public static byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + /** + * Get the Avro schema using the Apicurio Registry. In order to call the registry, we need to + * get the globalId of the Avro Schema in the registry. If the format is configured to expect + * headers, then the globalId will be obtained from the headers map. Otherwise, the globalId is + * obtained from the message payload, depending on the format configuration. This format is only + * used with the Kafka connector. The Kafka connector lives in another repository. This + * repository does not have any Kafka dependencies; it is therefore not possible for us to use + * the Kafka consumer record here passing a deserialization handler. So we have to get the + * information we need directly from the headers or the payload itself. + * + * @param in input stream + * @param additionalParameters map of additional parameters - including the headers an whether + * this is a key or value + * @return the Avro Schema + * @throws IOException if there is an error + */ + @Override + public Schema readSchemaWithAdditionalParameters( + InputStream in, Map<String, Object> additionalParameters) throws IOException { + String methodName = "readSchemaWithAdditionalParameters"; + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " entered"); + } + if (in == null) { + return null; + } + boolean useGlobalId = (boolean) configs.get(USE_GLOBALID.key()); Review Comment: potential null pointer here if the map doesn't contain the key ########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioRegistryAvroFormatFactory.java: ########## @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.Projection; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.format.ProjectableDecodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import org.apache.avro.Schema; +import org.apache.avro.Schema.Parser; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.BASIC_AUTH_CREDENTIALS_PASSWORD; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.BASIC_AUTH_CREDENTIALS_USERID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_CLIENT_ID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_CLIENT_SECRET; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_SCOPE; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_TOKEN_EXPIRATION_REDUCTION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.OIDC_AUTH_URL; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.PROPERTIES; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_DESCRIPTION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_ID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_NAME; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.REGISTERED_ARTIFACT_VERSION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SCHEMA; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_KEYSTORE_LOCATION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_KEYSTORE_PASSWORD; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_TRUSTSTORE_LOCATION; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.SSL_TRUSTSTORE_PASSWORD; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.URL; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; + +/** Apicurio avro format. */ +@Internal +public class ApicurioRegistryAvroFormatFactory + implements DeserializationFormatFactory, SerializationFormatFactory { + + // used this key rather than apicurio-avro (the format name) to be consistent with Confluent. + public static final String IDENTIFIER = "avro-apicurio"; + + @Override + public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + + String schemaRegistryURL = formatOptions.get(URL); + Optional<String> schemaString = formatOptions.getOptional(SCHEMA); + Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions); + return new ProjectableDecodingFormat<DeserializationSchema<RowData>>() { + // -------------------------------------------------------------------------------------------- + // Mutable attributes + // -------------------------------------------------------------------------------------------- + + private List<String> metadataKeys; + + @Override + public DeserializationSchema<RowData> createRuntimeDecoder( + DynamicTableSource.Context context, + DataType producedDataType, + int[][] projections) { + producedDataType = Projection.of(projections).project(producedDataType); + final RowType rowType = (RowType) producedDataType.getLogicalType(); + // schema has been supplied + final Schema schema = + schemaString + .map(s -> getAvroSchema(s, rowType)) + .orElse(AvroSchemaConverter.convertToSchema(rowType)); + final TypeInformation<RowData> rowDataTypeInfo = + context.createTypeInformation(producedDataType); + return new AvroRowDataDeserializationSchema( + ApicurioRegistryAvroDeserializationSchema.forGeneric( + schema, schemaRegistryURL, optionalPropertiesMap), + AvroToRowDataConverters.createRowConverter(rowType), + rowDataTypeInfo); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + }; + } + + @Override + public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat( + DynamicTableFactory.Context context, ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + + String schemaRegistryURL = formatOptions.get(URL); + Optional<String> schemaString = formatOptions.getOptional(SCHEMA); + Map<String, ?> optionalPropertiesMap = buildOptionalPropertiesMap(formatOptions); + + return new EncodingFormat<SerializationSchema<RowData>>() { + @Override + public SerializationSchema<RowData> createRuntimeEncoder( + DynamicTableSink.Context context, DataType consumedDataType) { + final RowType rowType = (RowType) consumedDataType.getLogicalType(); + final Schema schema = + schemaString + .map(s -> getAvroSchema(s, rowType)) + .orElse(AvroSchemaConverter.convertToSchema(rowType)); + return new AvroRowDataSerializationSchema( + rowType, + ApicurioRegistryAvroSerializationSchema.forGeneric( + schema, schemaRegistryURL, optionalPropertiesMap), + RowDataToAvroConverters.createConverter(rowType)); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + }; + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set<ConfigOption<?>> requiredOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(URL); + return options; + } + + @Override + public Set<ConfigOption<?>> optionalOptions() { + return getOptionalOptions(); + } + + protected static Set<ConfigOption<?>> getOptionalOptions() { + Set<ConfigOption<?>> options = new HashSet<>(); + options.add(USE_HEADERS); + options.add(USE_GLOBALID); + options.add(SCHEMA); + options.add(REGISTERED_ARTIFACT_NAME); + options.add(REGISTERED_ARTIFACT_DESCRIPTION); + options.add(REGISTERED_ARTIFACT_ID); + options.add(REGISTERED_ARTIFACT_VERSION); + options.add(PROPERTIES); + options.add(SSL_KEYSTORE_LOCATION); + options.add(SSL_KEYSTORE_PASSWORD); + options.add(SSL_TRUSTSTORE_LOCATION); + options.add(SSL_TRUSTSTORE_PASSWORD); + options.add(BASIC_AUTH_CREDENTIALS_USERID); + options.add(BASIC_AUTH_CREDENTIALS_PASSWORD); + options.add(OIDC_AUTH_URL); + options.add(OIDC_AUTH_CLIENT_ID); + options.add(OIDC_AUTH_CLIENT_SECRET); + options.add(OIDC_AUTH_SCOPE); + options.add(OIDC_AUTH_TOKEN_EXPIRATION_REDUCTION); + + return options; + } + + @Override + public Set<ConfigOption<?>> forwardOptions() { + return Stream.of( + URL, + USE_HEADERS, + USE_GLOBALID, + SCHEMA, + REGISTERED_ARTIFACT_NAME, + REGISTERED_ARTIFACT_DESCRIPTION, + REGISTERED_ARTIFACT_ID, + REGISTERED_ARTIFACT_VERSION, + PROPERTIES, + SSL_KEYSTORE_LOCATION, + SSL_KEYSTORE_PASSWORD, + SSL_TRUSTSTORE_LOCATION, + SSL_TRUSTSTORE_PASSWORD, + BASIC_AUTH_CREDENTIALS_USERID, + BASIC_AUTH_CREDENTIALS_PASSWORD, + OIDC_AUTH_URL, + OIDC_AUTH_CLIENT_ID, + OIDC_AUTH_CLIENT_SECRET, + OIDC_AUTH_SCOPE, + OIDC_AUTH_TOKEN_EXPIRATION_REDUCTION) + .collect(Collectors.toSet()); + } + + public static @Nullable Map<String, Object> buildOptionalPropertiesMap( + ReadableConfig formatOptions) { + final Map<String, Object> properties = new HashMap<>(); + + Set<ConfigOption<?>> configOptionsArray = getOptionalOptions(); + + for (ConfigOption configOption : configOptionsArray) { + formatOptions + .getOptional(configOption) + .ifPresent(v -> properties.put(configOption.key(), v)); + // when there is no value we would like to set the default as the value. + // at java 11 we can use the orElse or isEmpty , but we are java 8 + String configOptionKey = configOption.key(); + if (properties.get(configOptionKey) == null && configOption.hasDefaultValue()) { + properties.put(configOptionKey, configOption.defaultValue()); + } + } + return properties; + } + + private static Schema getAvroSchema(String schemaString, RowType rowType) { + LogicalType convertedDataType = Review Comment: Can this throw errors? I'd expect a parse error or equivalent checked exception? ########## flink-formats/flink-avro-apicurio-registry/src/main/java/org/apache/flink/formats/avro/registry/apicurio/ApicurioSchemaRegistryCoder.java: ########## @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.registry.apicurio; + +import org.apache.flink.formats.avro.AvroFormatOptions; +import org.apache.flink.formats.avro.SchemaCoder; + +import io.apicurio.registry.rest.client.RegistryClient; +import io.apicurio.registry.rest.v2.beans.ArtifactMetaData; +import io.apicurio.registry.rest.v2.beans.ArtifactReference; +import io.apicurio.registry.rest.v2.beans.IfExists; +import io.apicurio.registry.rest.v2.beans.VersionMetaData; +import io.apicurio.registry.types.ArtifactType; +import org.apache.avro.Schema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_GLOBALID; +import static org.apache.flink.formats.avro.registry.apicurio.AvroApicurioFormatOptions.USE_HEADERS; + +/** Reads and Writes schema using Avro Schema Registry protocol. */ +public class ApicurioSchemaRegistryCoder implements SchemaCoder { + + public static final String APICURIO_KEY_GLOBAL_ID_HEADER = "apicurio.key.globalId"; + + public static final String APICURIO_KEY_CONTENT_ID_HEADER = "apicurio.key.contentId"; + + public static final String APICURIO_VALUE_GLOBAL_ID_HEADER = "apicurio.value.globalId"; + + public static final String APICURIO_VALUE_CONTENT_ID_HEADER = "apicurio.value.contentId"; + + public static final String APICURIO_VALUE_ENCODING = "apicurio.value.encoding"; + + private final RegistryClient registryClient; + + private final Map<String, Object> configs; + + private static final int MAGIC_BYTE = 0; + + public static final String TOPIC_NAME = "TOPIC_NAME"; + + public static final String HEADERS = "HEADERS"; + + public static final String IS_KEY = "IS_KEY"; + + private static final Logger LOG = LoggerFactory.getLogger(ApicurioSchemaRegistryCoder.class); + + /** + * Creates {@link SchemaCoder} that uses provided {@link RegistryClient} to connect to schema + * registry. + * + * @param registryClient client to connect schema registry + * @param configs map for registry configs + */ + public ApicurioSchemaRegistryCoder(RegistryClient registryClient, Map<String, Object> configs) { + this.registryClient = registryClient; + this.configs = configs; + } + + @Override + public Schema readSchema(InputStream in) throws IOException { + throw new IOException( + "Incompatible Kafka connector. Use a Kafka connector that passes headers"); + } + + public long bytesToLong(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.put(bytes); + buffer.flip(); // need flip + return buffer.getLong(); + } + + public static byte[] longToBytes(long x) { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putLong(x); + return buffer.array(); + } + + /** + * Get the Avro schema using the Apicurio Registry. In order to call the registry, we need to + * get the globalId of the Avro Schema in the registry. If the format is configured to expect + * headers, then the globalId will be obtained from the headers map. Otherwise, the globalId is + * obtained from the message payload, depending on the format configuration. This format is only + * used with the Kafka connector. The Kafka connector lives in another repository. This + * repository does not have any Kafka dependencies; it is therefore not possible for us to use + * the Kafka consumer record here passing a deserialization handler. So we have to get the + * information we need directly from the headers or the payload itself. + * + * @param in input stream + * @param additionalParameters map of additional parameters - including the headers an whether + * this is a key or value + * @return the Avro Schema + * @throws IOException if there is an error + */ + @Override + public Schema readSchemaWithAdditionalParameters( + InputStream in, Map<String, Object> additionalParameters) throws IOException { + String methodName = "readSchemaWithAdditionalParameters"; + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " entered"); + } + if (in == null) { + return null; + } + boolean useGlobalId = (boolean) configs.get(USE_GLOBALID.key()); + + long schemaId = getSchemaId(in, additionalParameters, useGlobalId); + + if (!useGlobalId) { + // if we are using the content ID there is not get that will dereference. + // so we need to get the artifact references, find the latest version, + // using its groupId and artifactId we can get the globalId, from which we can get + // a de-referenced schema + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " got using contentId " + schemaId); + } + List<ArtifactReference> artifactReferenceList = + registryClient.getArtifactReferencesByContentId(schemaId); + if (artifactReferenceList == null || artifactReferenceList.isEmpty()) { + throw new IOException( + "Could not find any schemas in the Apicurio Registry with content ID " + + schemaId); + } + long latestVersion = 0L; + String artifactId = ""; + String groupId = ""; + for (ArtifactReference artifactReference : artifactReferenceList) { + long version = Long.valueOf(artifactReference.getVersion()); + // TODO check Apicurio code to see how it checks for the latest version + if (version > latestVersion) { + latestVersion = version; + artifactId = artifactReference.getArtifactId(); + groupId = artifactReference.getGroupId(); + } + } + VersionMetaData artifactVersion = + registryClient.getArtifactVersionMetaData( + groupId, artifactId, latestVersion + ""); + schemaId = artifactVersion.getGlobalId(); + } + // get the schema in canonical form with references dereferenced + InputStream schemaInputStream = registryClient.getContentByGlobalId(schemaId, true, true); + Schema schema = new Schema.Parser().parse(schemaInputStream); + if (LOG.isDebugEnabled()) { + LOG.debug( + methodName + + " got schema " + + schema + + " using " + + (useGlobalId ? "global" : "content") + + " ID " + + schemaId); + } + return schema; + } + + /** + * Get the schema id. + * + * @param in the Kafka body as an input stream + * @param additionalParameters additional Pararamters containing the headers and whetehr we are + * processing a key or value + * @param useGlobalID hen true globalIDs are used, otherwise contentIDS + * @return the id of the schema + * @throws IOException error occurred + */ + protected long getSchemaId( + InputStream in, Map<String, Object> additionalParameters, boolean useGlobalID) + throws IOException { + long schemaId = 0L; + Map<String, Object> headers = (Map<String, Object>) additionalParameters.get(HEADERS); + boolean isKey = (boolean) additionalParameters.get(IS_KEY); + boolean useHeaders = (boolean) configs.get(USE_HEADERS.key()); + + if (useHeaders) { + String globalIDHeaderName = + isKey ? APICURIO_KEY_GLOBAL_ID_HEADER : APICURIO_VALUE_GLOBAL_ID_HEADER; + String contentIDHeaderName = + isKey ? APICURIO_KEY_CONTENT_ID_HEADER : APICURIO_VALUE_CONTENT_ID_HEADER; + // get from headers + if (headers.get(globalIDHeaderName) != null) { + if (!useGlobalID) { + throw new IOException( + "The message has a global ID header " + + globalIDHeaderName + + ", but the configuration is for content ID.\n" + + "Header names received: \n" + + headers.keySet().stream().collect(Collectors.joining(","))); + } + byte[] globalIDByteArray = (byte[]) headers.get(globalIDHeaderName); + schemaId = bytesToLong(globalIDByteArray); + } else if (headers.get(contentIDHeaderName) != null) { + if (useGlobalID) { + throw new IOException( + "The message has a content ID header " + + contentIDHeaderName + + "\n" + + ", but the configuration is for global ID.\n" + + "Header names received: " + + headers.keySet().stream().collect(Collectors.joining(","))); + } + byte[] contentIDByteArray = (byte[]) headers.get(contentIDHeaderName); + schemaId = bytesToLong(contentIDByteArray); + } else { + String headerName = useGlobalID ? globalIDHeaderName : contentIDHeaderName; + throw new IOException( + "The format was configured to enable headers, but the expected Kafka header " + + headerName + + " is not present.\n " + + "Header names received: " + + headers.keySet().stream().collect(Collectors.joining(",")) + + "\n " + + "Ensure that the kafka message was created by an Apicurio client. Check that it is " + + "sending the appropriate ID in the header, if the message is sending the ID in the payload,\n" + + " review and amend the format configuration options " + + USE_HEADERS + + " and " + + USE_GLOBALID); + } + } else { + // the id is in the payload + DataInputStream dataInputStream = new DataInputStream(in); + if (dataInputStream.readByte() != 0) { + throw new IOException( + "Unknown data format. Magic number was not found. " + + USE_HEADERS + + "= false" + + " configs=" + + configs); + } + schemaId = dataInputStream.readLong(); + } + return schemaId; + } + + @Override + public void writeSchema(Schema schema, OutputStream out) throws IOException { + throw new IOException( + "Incompatible Kafka connector is being used, upgrade the Kafka connector."); + } + + /** + * The Avro schema will be written depending on the configuration. + * + * @param schema Avro Schema to write into the output stream, + * @param out the output stream where the schema is serialized. + * @param inputProperties properties containing the topic name + * @param outputProperties headers to populate + * @throws IOException Exception has occurred + */ + @Override + public void writeSchema( + Schema schema, + OutputStream out, + Map<String, Object> inputProperties, + Map<String, Object> outputProperties) + throws IOException { + String methodName = "writeSchema with additional properties"; + if (LOG.isDebugEnabled()) { + LOG.debug(methodName + " entered"); + } + + boolean useHeaders = (boolean) configs.get(USE_HEADERS.key()); + boolean useGlobalId = (boolean) configs.get(USE_GLOBALID.key()); + boolean isKey = (boolean) inputProperties.get(IS_KEY); + + String groupId = (String) configs.get(AvroApicurioFormatOptions.GROUP_ID.key()); + Object artifactId = configs.get(AvroApicurioFormatOptions.REGISTERED_ARTIFACT_ID.key()); + Object artifactName = configs.get(AvroApicurioFormatOptions.REGISTERED_ARTIFACT_NAME.key()); + if (inputProperties.get(TOPIC_NAME) == null) { + String configOptionName = null; + if (artifactId == null) { + configOptionName = "artifactId"; + } + if (artifactName == null) { + configOptionName = "artifactName"; + } + if (configOptionName != null) { + throw new IOException( + "No topic name was configured, and it is required to provide a default for " + + configOptionName); + } + } + String defaultArtifactName = + (String) inputProperties.get(TOPIC_NAME + (isKey ? "-key" : "-value")); + artifactId = (artifactId == null) ? defaultArtifactName : artifactId; + artifactName = (artifactName == null) ? defaultArtifactName : artifactName; + + // config option has a default so no need to check for null + String artifactDescription = + (String) Review Comment: More occurrances of null pointers when casting from `.get()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
