hhkkxxx133 commented on a change in pull request #17360:
URL: https://github.com/apache/flink/pull/17360#discussion_r718011735
##########
File path: docs/data/sql_connectors.yml
##########
@@ -48,6 +48,12 @@ avro-confluent:
category: format
sql_url:
https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro-confluent-registry/$version/flink-sql-avro-confluent-registry-$version.jar
+avro-glue:
+ name: Avro Schema Registry
Review comment:
Can the name be distinguished from Confluent? For example Avro Glue
Schema Registry?
##########
File path:
flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.flink.formats.avro.glue.schema.registry;
+
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.AUTO_REGISTRATION;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.AWS_REGION;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.CACHE_SIZE;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.CACHE_TTL_MS;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.COMPATIBILITY;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.COMPRESSION_TYPE;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.ENDPOINT;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.RECORD_TYPE;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.REGISTRY_NAME;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_REGISTRY_SUBJECT;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Table format factory for providing configured instances of AWS Glue Schema
+ * Registry Avro to RowData {@link SerializationSchema} and
+ * {@link DeserializationSchema}.
+ */
+@Internal
+public class GlueSchemaRegistryAvroFormatFactory implements
DeserializationFormatFactory, SerializationFormatFactory {
+ public static final String IDENTIFIER = "avro-glue";
+
+ @Override
+ public DecodingFormat<DeserializationSchema<RowData>>
createDecodingFormat(DynamicTableFactory.Context context,
+ ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ final Map<String, Object> configMap = buildConfigMap(formatOptions);
+
+ return new DecodingFormat<DeserializationSchema<RowData>>() {
+ @Override
+ public DeserializationSchema<RowData>
createRuntimeDecoder(DynamicTableSource.Context context,
+ DataType producedDataType) {
+ final RowType rowType = (RowType)
producedDataType.getLogicalType();
+ final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDataType);
+ return new AvroRowDataDeserializationSchema(
+ GlueSchemaRegistryAvroDeserializationSchema
+
.forGeneric(AvroSchemaConverter.convertToSchema(rowType), configMap),
+ AvroToRowDataConverters.createRowConverter(rowType),
rowDataTypeInfo);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.all();
+ }
+ };
+ }
+
+ @Override
+ public EncodingFormat<SerializationSchema<RowData>>
createEncodingFormat(DynamicTableFactory.Context context,
+ ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+ return new EncodingFormat<SerializationSchema<RowData>>() {
+ @Override
+ public SerializationSchema<RowData>
createRuntimeEncoder(DynamicTableSink.Context context,
+ DataType consumedDataType) {
+ final RowType rowType = (RowType)
consumedDataType.getLogicalType();
+ return new AvroRowDataSerializationSchema(rowType,
+ GlueSchemaRegistryAvroSerializationSchema.forGeneric(
+ AvroSchemaConverter.convertToSchema(rowType),
+ formatOptions.get(SCHEMA_REGISTRY_SUBJECT),
buildConfigMap(formatOptions)),
+ RowDataToAvroConverters.createConverter(rowType));
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.all();
+ }
+ };
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ private Map<String, Object> buildConfigMap(ReadableConfig formatOptions) {
+ final Map<String, Object> properties = new HashMap<String, Object>();
+ formatOptions.getOptional(AWS_REGION).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.AWS_REGION, v));
+ formatOptions.getOptional(REGISTRY_NAME).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, v));
+ formatOptions.getOptional(RECORD_TYPE).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, v));
+ formatOptions.getOptional(COMPRESSION_TYPE).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, v));
+ formatOptions.getOptional(ENDPOINT).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, v));
+ formatOptions.getOptional(COMPATIBILITY).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, v));
+ formatOptions.getOptional(AUTO_REGISTRATION).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, v));
+ formatOptions.getOptional(CACHE_SIZE).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.CACHE_SIZE, v));
+ formatOptions.getOptional(CACHE_TTL_MS).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, v));
+ formatOptions.getOptional(CACHE_TTL_MS).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, v));
Review comment:
Duplicate?
##########
File path:
flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroFormatFactory.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.flink.formats.avro.glue.schema.registry;
+
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.AUTO_REGISTRATION;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.AWS_REGION;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.CACHE_SIZE;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.CACHE_TTL_MS;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.COMPATIBILITY;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.COMPRESSION_TYPE;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.ENDPOINT;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.RECORD_TYPE;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.REGISTRY_NAME;
+import static
org.apache.flink.formats.avro.glue.schema.registry.AvroGlueFormatOptions.SCHEMA_REGISTRY_SUBJECT;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema;
+import org.apache.flink.formats.avro.AvroRowDataSerializationSchema;
+import org.apache.flink.formats.avro.AvroToRowDataConverters;
+import org.apache.flink.formats.avro.RowDataToAvroConverters;
+import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Table format factory for providing configured instances of AWS Glue Schema
+ * Registry Avro to RowData {@link SerializationSchema} and
+ * {@link DeserializationSchema}.
+ */
+@Internal
+public class GlueSchemaRegistryAvroFormatFactory implements
DeserializationFormatFactory, SerializationFormatFactory {
+ public static final String IDENTIFIER = "avro-glue";
+
+ @Override
+ public DecodingFormat<DeserializationSchema<RowData>>
createDecodingFormat(DynamicTableFactory.Context context,
+ ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+ final Map<String, Object> configMap = buildConfigMap(formatOptions);
+
+ return new DecodingFormat<DeserializationSchema<RowData>>() {
+ @Override
+ public DeserializationSchema<RowData>
createRuntimeDecoder(DynamicTableSource.Context context,
+ DataType producedDataType) {
+ final RowType rowType = (RowType)
producedDataType.getLogicalType();
+ final TypeInformation<RowData> rowDataTypeInfo =
context.createTypeInformation(producedDataType);
+ return new AvroRowDataDeserializationSchema(
+ GlueSchemaRegistryAvroDeserializationSchema
+
.forGeneric(AvroSchemaConverter.convertToSchema(rowType), configMap),
+ AvroToRowDataConverters.createRowConverter(rowType),
rowDataTypeInfo);
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.upsert();
+ }
+ };
+ }
+
+ @Override
+ public EncodingFormat<SerializationSchema<RowData>>
createEncodingFormat(DynamicTableFactory.Context context,
+ ReadableConfig formatOptions) {
+ FactoryUtil.validateFactoryOptions(this, formatOptions);
+
+ return new EncodingFormat<SerializationSchema<RowData>>() {
+ @Override
+ public SerializationSchema<RowData>
createRuntimeEncoder(DynamicTableSink.Context context,
+ DataType consumedDataType) {
+ final RowType rowType = (RowType)
consumedDataType.getLogicalType();
+ return new AvroRowDataSerializationSchema(rowType,
+ GlueSchemaRegistryAvroSerializationSchema.forGeneric(
+ AvroSchemaConverter.convertToSchema(rowType),
+ formatOptions.get(SCHEMA_REGISTRY_SUBJECT),
buildConfigMap(formatOptions)),
+ RowDataToAvroConverters.createConverter(rowType));
+ }
+
+ @Override
+ public ChangelogMode getChangelogMode() {
+ return ChangelogMode.upsert();
+ }
+ };
+ }
+
+ @Override
+ public String factoryIdentifier() {
+ return IDENTIFIER;
+ }
+
+ private Map<String, Object> buildConfigMap(ReadableConfig formatOptions) {
+ final Map<String, Object> properties = new HashMap<String, Object>();
+ formatOptions.getOptional(AWS_REGION).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.AWS_REGION, v));
+ formatOptions.getOptional(REGISTRY_NAME).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, v));
+ formatOptions.getOptional(RECORD_TYPE).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, v));
+ formatOptions.getOptional(COMPRESSION_TYPE).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.COMPRESSION_TYPE, v));
+ formatOptions.getOptional(ENDPOINT).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.AWS_ENDPOINT, v));
+ formatOptions.getOptional(COMPATIBILITY).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, v));
+ formatOptions.getOptional(AUTO_REGISTRATION).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, v));
+ formatOptions.getOptional(CACHE_SIZE).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.CACHE_SIZE, v));
+ formatOptions.getOptional(CACHE_TTL_MS).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, v));
+ formatOptions.getOptional(CACHE_TTL_MS).ifPresent(v ->
properties.put(AWSSchemaRegistryConstants.CACHE_TIME_TO_LIVE_MILLIS, v));
+ return properties;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> result = new HashSet<>();
+ result.add(REGISTRY_NAME);
+ result.add(AWS_REGION);
+ result.add(SCHEMA_REGISTRY_SUBJECT);
+ return result;
+ }
Review comment:
I agree. Looks good to me.
##########
File path: docs/content/docs/connectors/table/formats/avro-glue.md
##########
@@ -0,0 +1,191 @@
+---
+title: Confluent Avro
+weight: 4
+type: docs
+aliases:
+ - /dev/table/connectors/formats/avro-confluent.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Confluent Avro Format
Review comment:
Should be updated as well to indicate Glue Schema Registry.
##########
File path: docs/content/docs/connectors/table/formats/avro-glue.md
##########
@@ -0,0 +1,191 @@
+---
+title: Confluent Avro
Review comment:
Title is incorrectly referring to "Confluent".
##########
File path: docs/content/docs/connectors/table/formats/avro-glue.md
##########
@@ -0,0 +1,191 @@
+---
+title: Confluent Avro
+weight: 4
+type: docs
+aliases:
+ - /dev/table/connectors/formats/avro-confluent.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Confluent Avro Format
+
+{{< label "Format: Serialization Schema" >}}
+{{< label "Format: Deserialization Schema" >}}
+
+The Glue Schema Registry (``avro-glue``) format allows you to read records
that were serialized by the
``com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer``
and to write records that can in turn be read by the
``com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer``.
These records have their schemas stored out-of-band in a configured registry
provided by the AWS Glue Schema Registry
[service](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas).
+
+When reading (deserializing) a record with this format the Avro writer schema
is fetched from the configured AWS Glue Schema Registry based on the schema
version id encoded in the record while the reader schema is inferred from table
schema.
+
+When writing (serializing) a record with this format the Avro schema is
inferred from the table schema and used to retrieve a schema id to be encoded
with the data. The lookup is performed against the configured AWS Glue Schema
Registry under the
[subject](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html#schema-registry-schemas)
given in `avro-glue.subject`.
+
+The Avro Glue Schema Registry format can only be used in conjunction with the
[Apache Kafka SQL connector]({{< ref "docs/connectors/table/kafka" >}}) or the
[Upsert Kafka SQL Connector]({{< ref "docs/connectors/table/upsert-kafka" >}}).
+
+Dependencies
+------------
+
+{{< sql_download_table "avro-glue" >}}
+
+How to create tables with Avro-Glue format
+--------------
+
+Example of a table using raw UTF-8 string as Kafka key and Avro records
registered in the Schema Registry as Kafka values:
+
+```sql
+CREATE TABLE user_created (
+
+ -- one column mapped to the Kafka raw UTF-8 key
+ the_kafka_key STRING,
+
+ -- a few columns mapped to the Avro fields of the Kafka value
+ id STRING,
+ name STRING,
+ email STRING
+
+) WITH (
+
+ 'connector' = 'kafka',
+ 'topic' = 'user_events_example1',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+
+ -- UTF-8 string as Kafka keys, using the 'the_kafka_key' table column
+ 'key.format' = 'raw',
+ 'key.fields' = 'the_kafka_key',
+
+ 'value.format' = 'avro-glue',
+ 'value.avro-glue.region' = 'us-east-1',
+ 'value.avro-glue.registry.name' = 'my-schema-registry',
+ 'value.avro-glue.subject' = 'my-schema-name',
+ 'value.fields-include' = 'EXCEPT_KEY'
+)
+```
+
+Format Options
+----------------
+
+Yes, these options have inconsistent naming convnetions. No, I can't fix it.
This is for consistentcy with the existing [AWS Glue client
code](https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java#L20).
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what format to use, here should be
<code>'avro-glue'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>avro-glue.region</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what AWS region to use, such as
<code>'us-east-1'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>avro-glue.registry.name</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The name (not the ARN) of the Glue schema registry in which to
store the schemas.</td>
+ </tr>
+ <tr>
+ <td><h5>avro-glue.subject</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The subject name under which to store the schema in the
registry.</td>
Review comment:
Can this be named to `avro-glue.schema.name`?
##########
File path:
flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/AvroGlueFormatOptions.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.flink.formats.avro.glue.schema.registry;
+
+import java.time.Duration;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import
com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION;
+import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+
+import software.amazon.awssdk.services.glue.model.Compatibility;
+
+@PublicEvolving
+public class AvroGlueFormatOptions {
+ public static final String PREFIX = "schema-registry.";
+
+ public static final ConfigOption<String> AWS_REGION =
ConfigOptions.key(PREFIX + AWSSchemaRegistryConstants.AWS_REGION)
+ .stringType().noDefaultValue().withDescription("AWS region");
+
+ public static final ConfigOption<String> REGISTRY_NAME =
ConfigOptions.key(PREFIX + AWSSchemaRegistryConstants.REGISTRY_NAME)
+ .stringType().noDefaultValue().withDescription("Registry name");
+
+ public static final ConfigOption<String> RECORD_TYPE = ConfigOptions
+ .key(PREFIX +
AWSSchemaRegistryConstants.AVRO_RECORD_TYPE).stringType()
+
.defaultValue(AvroRecordType.GENERIC_RECORD.getName()).withDescription("Record
type");
+
+ public static final ConfigOption<String> SCHEMA_REGISTRY_SUBJECT =
ConfigOptions.key(PREFIX + "subject").stringType()
Review comment:
Same as above: can this be named to `SCHEMA_NAME`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]