dannycranmer commented on a change in pull request #14737: URL: https://github.com/apache/flink/pull/14737#discussion_r577638986
########## File path: flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.glue.schema.registry.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.kinesis.model.PutRecordRequest; +import com.amazonaws.services.kinesis.model.PutRecordResult; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.schemaregistry.common.AWSDeserializerInput; +import com.amazonaws.services.schemaregistry.common.AWSSerializerInput; +import com.amazonaws.services.schemaregistry.deserializers.AWSDeserializer; +import com.amazonaws.services.schemaregistry.serializers.avro.AWSAvroSerializer; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +/** + * Simple client to publish and retrieve messages, using the AWS Kinesis SDK, Flink Kinesis + * Connectors and Glue Schema Registry classes. + */ +public class GSRKinesisPubsubClient { + private static final Logger LOG = LoggerFactory.getLogger(GSRKinesisPubsubClient.class); + + private final AmazonKinesis kinesisClient; + private final Properties properties; + + public GSRKinesisPubsubClient(Properties properties) { + this.kinesisClient = createClientWithCredentials(properties); + this.properties = properties; + } + + public void createStream(String stream, int shards, Properties props) throws Exception { + try { + kinesisClient.describeStream(stream); + kinesisClient.deleteStream(stream); + } catch (ResourceNotFoundException rnfe) { + } + + kinesisClient.createStream(stream, shards); + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(5)); + while (deadline.hasTimeLeft()) { + try { + Thread.sleep(250); + if (kinesisClient.describeStream(stream).getStreamDescription().getShards().size() + != shards) { + continue; + } + break; + } catch (ResourceNotFoundException rnfe) { Review comment: nit: Add a comment to say this is expected/ok. At a glance it looks like it is missing error handling ########## File path: flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoderProvider.java ########## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.glue.schema.registry; + +import org.apache.flink.formats.avro.SchemaCoder; + +import lombok.NonNull; Review comment: Yes I meant you are using `lombok` rather than `javax.annotation`. But I think this is not needed since Flink coding standards say everything is `nonnull` by default: - https://flink.apache.org/contributing/code-style-and-quality-common.html#nullability-of-the-mutable-parts This also opens the question as to why the `transportName` is not `@NonNull`? Is it `@Nullable`? Please remove the annotations unless there is a good reason to keep them. ########## File path: flink-end-to-end-tests/test-scripts/test_glue_schema_registry.sh ########## @@ -0,0 +1,72 @@ +#!/usr/bin/env bash Review comment: https://issues.apache.org/jira/browse/FLINK-21391 ########## File path: flink-formats/flink-avro-glue-schema-registry/pom.xml ########## @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-formats</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.13-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-avro-glue-schema-registry</artifactId> + <name>Flink : Formats : Avro AWS Glue Schema Registry</name> + <packaging>jar</packaging> + + <properties> + <glue.schema.registry.version>1.0.0</glue.schema.registry.version> + <junit.jupiter.version>5.6.2</junit.jupiter.version> + <enforcer.skip>true</enforcer.skip> + </properties> + + <dependencies> Review comment: This comment looks misplaced. This is not related to the enforcer skip, it is looking at the transitive dependency chain. Did you reply to the wrong thread? ########## File path: flink-end-to-end-tests/flink-glue-schema-registry-test/pom.xml ########## @@ -0,0 +1,252 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.13-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-glue-schema-registry-test</artifactId> + <name>Flink : E2E Tests : AWS Glue Schema Registry</name> + <packaging>jar</packaging> + + <properties> + <httpclient.version>4.5.3</httpclient.version> + <httpcore.version>4.4.6</httpcore.version> + <aws.sdk.version>1.11.754</aws.sdk.version> + <aws.sdkv2.version>2.15.32</aws.sdkv2.version> + </properties> + + <!-- ============================= --> + <!-- DEPENDENCY MANAGEMENT --> + <!-- ============================= --> + <dependencyManagement> Review comment: Why is there a dependency management block here? Unless I am mistaken `dependencyManagement` is used to setup dependencies for child modules, that include this pom as their parent. How is this being used in this context? ########## File path: flink-formats/flink-avro-glue-schema-registry/pom.xml ########## @@ -0,0 +1,99 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-formats</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.13-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-avro-glue-schema-registry</artifactId> + <name>Flink : Formats : Avro AWS Glue Schema Registry</name> + <packaging>jar</packaging> + + <properties> + <glue.schema.registry.version>1.0.0</glue.schema.registry.version> + <junit.jupiter.version>5.6.2</junit.jupiter.version> + <enforcer.skip>true</enforcer.skip> + </properties> + + <dependencies> + <!-- core dependencies --> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-avro</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> Review comment: What is fixed? I do not see any changes? ########## File path: flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryAvroSchemaCoder.java ########## @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.avro.glue.schema.registry; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.formats.avro.SchemaCoder; + +import org.apache.avro.Schema; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.Map; + +/** + * Schema coder that allows reading schema that is somehow embedded into serialized record. Used by + * {@link GlueSchemaRegistryAvroDeserializationSchema} and {@link + * GlueSchemaRegistryAvroSerializationSchema}. + */ +public class GlueSchemaRegistryAvroSchemaCoder implements SchemaCoder { + private GlueSchemaRegistryInputStreamDeserializer glueSchemaRegistryInputStreamDeserializer; + private GlueSchemaRegistryOutputStreamSerializer glueSchemaRegistryOutputStreamSerializer; + + /** + * Constructor accepts transport name and configuration map for AWS Glue Schema Registry. + * + * @param transportName topic name or stream name etc. + * @param configs configurations for AWS Glue Schema Registry + */ + public GlueSchemaRegistryAvroSchemaCoder( + final String transportName, final Map<String, Object> configs) { + glueSchemaRegistryInputStreamDeserializer = + new GlueSchemaRegistryInputStreamDeserializer(configs); + glueSchemaRegistryOutputStreamSerializer = + new GlueSchemaRegistryOutputStreamSerializer(transportName, configs); + } + + @VisibleForTesting + protected GlueSchemaRegistryAvroSchemaCoder( + final GlueSchemaRegistryInputStreamDeserializer + glueSchemaRegistryInputStreamDeserializer) { + this.glueSchemaRegistryInputStreamDeserializer = glueSchemaRegistryInputStreamDeserializer; + } + + @VisibleForTesting + protected GlueSchemaRegistryAvroSchemaCoder( + final GlueSchemaRegistryOutputStreamSerializer + glueSchemaRegistryOutputStreamSerializer) { + this.glueSchemaRegistryOutputStreamSerializer = glueSchemaRegistryOutputStreamSerializer; + } + + @Override + public Schema readSchema(InputStream in) throws IOException { + return glueSchemaRegistryInputStreamDeserializer.getSchemaAndDeserializedStream(in); + } + + @Override + public void writeSchema(Schema schema, OutputStream out) throws IOException { + byte[] data = ((ByteArrayOutputStream) out).toByteArray(); Review comment: ok, please add the `Preconditions` check ########## File path: flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryExample.java ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.glue.schema.registry.test; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroDeserializationSchema; +import org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroSerializationSchema; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil; + +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +import java.io.File; +import java.io.IOException; +import java.net.URL; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * A simple example that shows how to read from and write to Kinesis. This will read Avro messages + * from the input topic, and finally write back to another topic. Review comment: nit: Kinesis streams; topics are a Kafka thing ########## File path: flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java ########## @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.glue.schema.registry.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.kinesis.AmazonKinesis; +import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import com.amazonaws.services.kinesis.model.PutRecordRequest; +import com.amazonaws.services.kinesis.model.PutRecordResult; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.schemaregistry.common.AWSDeserializerInput; +import com.amazonaws.services.schemaregistry.common.AWSSerializerInput; +import com.amazonaws.services.schemaregistry.deserializers.AWSDeserializer; +import com.amazonaws.services.schemaregistry.serializers.avro.AWSAvroSerializer; +import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants; +import com.amazonaws.services.schemaregistry.utils.AvroRecordType; +import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; + +/** + * Simple client to publish and retrieve messages, using the AWS Kinesis SDK, Flink Kinesis + * Connectors and Glue Schema Registry classes. + */ +public class GSRKinesisPubsubClient { + private static final Logger LOG = LoggerFactory.getLogger(GSRKinesisPubsubClient.class); + + private final AmazonKinesis kinesisClient; + private final Properties properties; + + public GSRKinesisPubsubClient(Properties properties) { + this.kinesisClient = createClientWithCredentials(properties); + this.properties = properties; + } + + public void createStream(String stream, int shards, Properties props) throws Exception { + try { + kinesisClient.describeStream(stream); + kinesisClient.deleteStream(stream); + } catch (ResourceNotFoundException rnfe) { Review comment: nit: Add a comment to say this is expected/ok. At a glance it looks like it is missing error handling ########## File path: flink-end-to-end-tests/flink-glue-schema-registry-test/pom.xml ########## @@ -0,0 +1,252 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.13-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-glue-schema-registry-test</artifactId> + <name>Flink : E2E Tests : AWS Glue Schema Registry</name> + <packaging>jar</packaging> + + <properties> + <httpclient.version>4.5.3</httpclient.version> + <httpcore.version>4.4.6</httpcore.version> + <aws.sdk.version>1.11.754</aws.sdk.version> + <aws.sdkv2.version>2.15.32</aws.sdkv2.version> + </properties> + + <!-- ============================= --> + <!-- DEPENDENCY MANAGEMENT --> + <!-- ============================= --> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>software.amazon.awssdk</groupId> + <artifactId>http-client-spi</artifactId> + <version>2.15.32</version> Review comment: Update versions to use the property rather than hard coding ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
