Airblader commented on a change in pull request #17360: URL: https://github.com/apache/flink/pull/17360#discussion_r716413338
########## File path: flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GluSchemaRegistryAvroFormatFactory.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.flink.formats.avro.glue.schema.registry; + +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.AUTO_REGISTRATION; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.AWS_REGION; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.CACHE_SIZE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.CACHE_TTL_MS; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.COMPATIBILITY; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.COMPRESSION_TYPE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.ENDPOINT; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.RECORD_TYPE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.REGISTRY_NAME; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.SCHEMA_REGISTRY_SUBJECT; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +/** + * Table format factory for providing configured instances of AWS Glue Schema + * Registry Avro to RowData {@link SerializationSchema} and + * {@link DeserializationSchema}. + */ +public class GluSchemaRegistryAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { Review comment: Please mark this as `@Internal`. ########## File path: flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroOptions.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.flink.formats.avro.glue.schema.registry; + +import java.time.Duration; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import software.amazon.awssdk.services.glue.model.Compatibility; + +public class GlueSchemaRegistryAvroOptions { Review comment: Please follow the naming scheme here and name this `AvroGlueFormatOptions`. Also, this needs to be marked `@PublicEvolving`. ########## File path: flink-formats/flink-avro-glue-schema-registry/pom.xml ########## @@ -178,5 +192,33 @@ under the License. <version>${glue.schema.registry.version}</version> </dependency> + <!-- test dependencies --> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-api-java</artifactId> + <version>${project.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-table-runtime-blink_${scala.binary.version}</artifactId> Review comment: This module doesn't exist since the legacy planner has been removed, and the Blink planner is now the only planner. ```suggestion <artifactId>flink-table-runtime_${scala.binary.version}</artifactId> ``` ########## File path: flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroOptions.java ########## @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.flink.formats.avro.glue.schema.registry; + +import java.time.Duration; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants.COMPRESSION; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +import software.amazon.awssdk.services.glue.model.Compatibility; + +public class GlueSchemaRegistryAvroOptions { + private GlueSchemaRegistryAvroOptions() { Review comment: nit: please move the constructor to the end of the file (static constants come before the constructor). ########## File path: flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GluSchemaRegistryAvroFormatFactory.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.flink.formats.avro.glue.schema.registry; + +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.AUTO_REGISTRATION; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.AWS_REGION; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.CACHE_SIZE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.CACHE_TTL_MS; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.COMPATIBILITY; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.COMPRESSION_TYPE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.ENDPOINT; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.RECORD_TYPE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.REGISTRY_NAME; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.SCHEMA_REGISTRY_SUBJECT; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +/** + * Table format factory for providing configured instances of AWS Glue Schema + * Registry Avro to RowData {@link SerializationSchema} and + * {@link DeserializationSchema}. + */ +public class GluSchemaRegistryAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { Review comment: There's a typo (glu → glue) in this class name. ########## File path: flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GluSchemaRegistryAvroFormatFactory.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.flink.formats.avro.glue.schema.registry; + +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.AUTO_REGISTRATION; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.AWS_REGION; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.CACHE_SIZE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.CACHE_TTL_MS; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.COMPATIBILITY; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.COMPRESSION_TYPE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.ENDPOINT; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.RECORD_TYPE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.REGISTRY_NAME; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.SCHEMA_REGISTRY_SUBJECT; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +/** + * Table format factory for providing configured instances of AWS Glue Schema + * Registry Avro to RowData {@link SerializationSchema} and + * {@link DeserializationSchema}. + */ +public class GluSchemaRegistryAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { + public static final String IDENTIFIER = "avro-glue"; + + @Override + public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, + ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + var configMap = buildConfigMap(formatOptions); + + return new DecodingFormat<DeserializationSchema<RowData>>() { + @Override + public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context, + DataType producedDataType) { + final RowType rowType = (RowType) producedDataType.getLogicalType(); + final TypeInformation<RowData> rowDataTypeInfo = context.createTypeInformation(producedDataType); + return new AvroRowDataDeserializationSchema( + GlueSchemaRegistryAvroDeserializationSchema + .forGeneric(AvroSchemaConverter.convertToSchema(rowType), configMap), + AvroToRowDataConverters.createRowConverter(rowType), rowDataTypeInfo); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.upsert(); + } + }; + } + + @Override + public EncodingFormat<SerializationSchema<RowData>> createEncodingFormat(DynamicTableFactory.Context context, + ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + + return new EncodingFormat<SerializationSchema<RowData>>() { + @Override + public SerializationSchema<RowData> createRuntimeEncoder(DynamicTableSink.Context context, + DataType consumedDataType) { + final RowType rowType = (RowType) consumedDataType.getLogicalType(); + return new AvroRowDataSerializationSchema(rowType, + GlueSchemaRegistryAvroSerializationSchema.forGeneric( + AvroSchemaConverter.convertToSchema(rowType), + formatOptions.get(SCHEMA_REGISTRY_SUBJECT), buildConfigMap(formatOptions)), + RowDataToAvroConverters.createConverter(rowType)); + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.upsert(); + } + }; + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + private Map<String, Object> buildConfigMap(ReadableConfig formatOptions) { + var properties = new HashMap<String, Object>(); Review comment: Here, too, we cannot use `var`. ########## File path: flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GluSchemaRegistryAvroFormatFactory.java ########## @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.flink.formats.avro.glue.schema.registry; + +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.AUTO_REGISTRATION; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.AWS_REGION; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.CACHE_SIZE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.CACHE_TTL_MS; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.COMPATIBILITY; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.COMPRESSION_TYPE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.ENDPOINT; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.RECORD_TYPE; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.REGISTRY_NAME; +import static org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroOptions.SCHEMA_REGISTRY_SUBJECT; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; + +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.avro.AvroRowDataDeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDataSerializationSchema; +import org.apache.flink.formats.avro.AvroToRowDataConverters; +import org.apache.flink.formats.avro.RowDataToAvroConverters; +import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.format.DecodingFormat; +import org.apache.flink.table.connector.format.EncodingFormat; +import org.apache.flink.table.connector.sink.DynamicTableSink; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.factories.DeserializationFormatFactory; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.SerializationFormatFactory; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.RowType; + +/** + * Table format factory for providing configured instances of AWS Glue Schema + * Registry Avro to RowData {@link SerializationSchema} and + * {@link DeserializationSchema}. + */ +public class GluSchemaRegistryAvroFormatFactory implements DeserializationFormatFactory, SerializationFormatFactory { + public static final String IDENTIFIER = "avro-glue"; + + @Override + public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, + ReadableConfig formatOptions) { + FactoryUtil.validateFactoryOptions(this, formatOptions); + var configMap = buildConfigMap(formatOptions); Review comment: We're on Java 8 and cannot use `var`. Please use `final Map<…>`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
