dannycranmer commented on a change in pull request #16513:
URL: https://github.com/apache/flink/pull/16513#discussion_r691218793



##########
File path: 
flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/main/java/org/apache/flink/glue/schema/registry/test/json/generic/GlueSchemaRegistryGenericKinesisExample.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.glue.schema.registry.test.json.generic;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import 
org.apache.flink.formats.json.glue.schema.registry.GlueSchemaRegistryJsonDeserializationSchema;
+import 
org.apache.flink.formats.json.glue.schema.registry.GlueSchemaRegistryJsonSerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import 
com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A simple example that shows how to read from and write to Kinesis. This 
will read JSON messages
+ * from the input stream, and finally write back to another stream.
+ *
+ * <p>Example usage: --input-stream test-input --output-stream test-output 
--aws.endpoint
+ * https://localhost:4567 --flink.stream.initpos TRIM_HORIZON
+ */
+public class GlueSchemaRegistryGenericKinesisExample {

Review comment:
       Where/how is this class used? Is it originally used by the `sh` e2e 
tests? Can it be removed?

##########
File path: 
flink-end-to-end-tests/flink-glue-schema-registry-avro-test/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.glue.schema.registry.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import 
org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroDeserializationSchema;
+import 
org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroSerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.tests.util.categories.TravisGroup1;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URL;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
+
+/** End-to-end test for Glue Schema Registry AVRO format using Kinesalite. */
+@Category(value = {TravisGroup1.class})
+public class GlueSchemaRegistryAvroKinesisITCase extends TestLogger {
+    private static final String INPUT_STREAM = "gsr_avro_input_stream";
+    private static final String OUTPUT_STREAM = "gsr_avro_output_stream";
+    private static final String INTER_CONTAINER_KINESALITE_ALIAS = 
"kinesalite";
+    private static final String ACCESS_KEY = 
System.getenv("IT_CASE_GLUE_SCHEMA_ACCESS_KEY");
+    private static final String SECRET_KEY = 
System.getenv("IT_CASE_GLUE_SCHEMA_SECRET_KEY");
+
+    private static final Network network = Network.newNetwork();
+
+    @ClassRule public static final Timeout TIMEOUT = new Timeout(10, 
TimeUnit.MINUTES);
+
+    @ClassRule
+    public static final KinesaliteContainer KINESALITE =
+            new KinesaliteContainer(
+                            
DockerImageName.parse("instructure/kinesalite").withTag("latest"))
+                    .withNetwork(network)
+                    .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
+
+    private GSRKinesisPubsubClient kinesisClient;
+
+    @Before
+    public void setUp() throws Exception {
+        Assume.assumeTrue(
+                "Access key not configured, skipping test...",
+                !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY));
+        Assume.assumeTrue(
+                "Secret key not configured, skipping test...",
+                !StringUtils.isNullOrWhitespaceOnly(SECRET_KEY));
+
+        Properties properties = KINESALITE.getContainerProperties();
+
+        kinesisClient = new GSRKinesisPubsubClient(properties);
+        kinesisClient.createStream(INPUT_STREAM, 2, properties);
+        kinesisClient.createStream(OUTPUT_STREAM, 2, properties);
+    }
+
+    @Test
+    public void testGSRAvroSpecificFormatWithFlink() throws Exception {
+        List<GenericRecord> messages = getRecords();
+        for (GenericRecord msg : messages) {
+            kinesisClient.sendMessage(getSchema().toString(), INPUT_STREAM, 
msg);
+        }
+        log.info("generated records");
+
+        executeFlinkForAvroSpecificFormat();
+
+        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(60));
+        List<Object> results = kinesisClient.readAllMessages(OUTPUT_STREAM);
+        while (deadline.hasTimeLeft() && results.size() < messages.size()) {
+            log.info("waiting for results..");
+            Thread.sleep(1000);
+            results = kinesisClient.readAllMessages(OUTPUT_STREAM);
+        }
+        log.info("results: {}", results);
+
+        Assert.assertEquals(
+                "Results received from '" + OUTPUT_STREAM + "': " + results,
+                messages.size(),
+                results.size());
+
+        List<GenericRecord> expectedResults = getRecords();
+        for (Object expectedResult : expectedResults) {
+            Assert.assertTrue(results.contains(expectedResult));
+        }

Review comment:
       nit: you could do 
`Assert.assertTrue(expectedResults.containsAll(results))`

##########
File path: 
flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializer.java
##########
@@ -36,23 +36,24 @@
  * from it and remove schema registry information in the input stream.
  */
 public class GlueSchemaRegistryInputStreamDeserializer {

Review comment:
       Please make sure all classes in the src code are annotated appropriately 

##########
File path: 
flink-formats/flink-json-glue-schema-registry/src/main/java/org/apache/flink/formats/json/glue/schema/registry/GlueSchemaRegistryJsonDeserializationSchema.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.formats.json.glue.schema.registry;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.util.Map;
+
+/**
+ * AWS Glue Schema Registry Deserialization schema to de-serialize JSON Schema 
binary format for
+ * Flink Consumer user.
+ *
+ * @param <T> type of record it produces
+ */
+public class GlueSchemaRegistryJsonDeserializationSchema<T> implements 
DeserializationSchema<T> {

Review comment:
       Again, please annotation this class 
https://github.com/apache/flink/tree/master/flink-annotations/src/main/java/org/apache/flink/annotation

##########
File path: 
flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/main/java/org/apache/flink/glue/schema/registry/test/json/generic/GlueSchemaRegistryGenericKinesisExampleTest.java
##########
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.glue.schema.registry.test.json.generic;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.glue.schema.registry.test.json.GSRKinesisPubsubClient;
+
+import 
com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Test driver for {@link GlueSchemaRegistryGenericKinesisExample#main}. */
+public class GlueSchemaRegistryGenericKinesisExampleTest {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(GlueSchemaRegistryGenericKinesisExampleTest.class);
+
+    public static void main(String[] args) throws Exception {

Review comment:
       Where/how is this class used? Is it originally used by the `sh` e2e 
tests? Can it be removed?

##########
File path: 
flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/main/java/org/apache/flink/glue/schema/registry/test/json/specific/GlueSchemaRegistryPojoKinesisExampleTest.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.glue.schema.registry.test.json.specific;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.glue.schema.registry.test.json.GSRKinesisPubsubClient;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+/** Test driver for {@link GlueSchemaRegistryPojoKinesisExample#main}. */
+public class GlueSchemaRegistryPojoKinesisExampleTest {
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(GlueSchemaRegistryPojoKinesisExampleTest.class);
+
+    public static void main(String[] args) throws Exception {

Review comment:
       Where/how is this class used? Is it originally used by the `sh` e2e 
tests? Can it be removed?

##########
File path: flink-end-to-end-tests/test-scripts/test_glue_schema_registry_avro.sh
##########
@@ -25,7 +25,7 @@
 
################################################################################

Review comment:
       I think this entire script can be deleted? Since we have now migrated 
the test to the java framework?

##########
File path: flink-end-to-end-tests/test-scripts/test_glue_schema_registry_json.sh
##########
@@ -0,0 +1,89 @@
+#!/usr/bin/env bash

Review comment:
       I think this script can be deleted too now we have migrated to the Java 
e2e tests?

##########
File path: 
flink-end-to-end-tests/flink-glue-schema-registry-avro-test/src/test/java/org/apache/flink/glue/schema/registry/test/GlueSchemaRegistryAvroKinesisITCase.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.glue.schema.registry.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import 
org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroDeserializationSchema;
+import 
org.apache.flink.formats.avro.glue.schema.registry.GlueSchemaRegistryAvroSerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
+import org.apache.flink.tests.util.categories.TravisGroup1;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.Timeout;
+import org.testcontainers.containers.Network;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.net.URL;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.STREAM_INITIAL_POSITION;
+
+/** End-to-end test for Glue Schema Registry AVRO format using Kinesalite. */
+@Category(value = {TravisGroup1.class})
+public class GlueSchemaRegistryAvroKinesisITCase extends TestLogger {
+    private static final String INPUT_STREAM = "gsr_avro_input_stream";
+    private static final String OUTPUT_STREAM = "gsr_avro_output_stream";
+    private static final String INTER_CONTAINER_KINESALITE_ALIAS = 
"kinesalite";
+    private static final String ACCESS_KEY = 
System.getenv("IT_CASE_GLUE_SCHEMA_ACCESS_KEY");
+    private static final String SECRET_KEY = 
System.getenv("IT_CASE_GLUE_SCHEMA_SECRET_KEY");
+
+    private static final Network network = Network.newNetwork();
+
+    @ClassRule public static final Timeout TIMEOUT = new Timeout(10, 
TimeUnit.MINUTES);
+
+    @ClassRule
+    public static final KinesaliteContainer KINESALITE =
+            new KinesaliteContainer(
+                            
DockerImageName.parse("instructure/kinesalite").withTag("latest"))
+                    .withNetwork(network)
+                    .withNetworkAliases(INTER_CONTAINER_KINESALITE_ALIAS);
+
+    private GSRKinesisPubsubClient kinesisClient;
+
+    @Before
+    public void setUp() throws Exception {
+        Assume.assumeTrue(
+                "Access key not configured, skipping test...",
+                !StringUtils.isNullOrWhitespaceOnly(ACCESS_KEY));
+        Assume.assumeTrue(
+                "Secret key not configured, skipping test...",
+                !StringUtils.isNullOrWhitespaceOnly(SECRET_KEY));
+
+        Properties properties = KINESALITE.getContainerProperties();
+
+        kinesisClient = new GSRKinesisPubsubClient(properties);
+        kinesisClient.createStream(INPUT_STREAM, 2, properties);
+        kinesisClient.createStream(OUTPUT_STREAM, 2, properties);
+    }
+
+    @Test
+    public void testGSRAvroSpecificFormatWithFlink() throws Exception {
+        List<GenericRecord> messages = getRecords();
+        for (GenericRecord msg : messages) {
+            kinesisClient.sendMessage(getSchema().toString(), INPUT_STREAM, 
msg);
+        }
+        log.info("generated records");
+
+        executeFlinkForAvroSpecificFormat();
+
+        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(60));
+        List<Object> results = kinesisClient.readAllMessages(OUTPUT_STREAM);
+        while (deadline.hasTimeLeft() && results.size() < messages.size()) {
+            log.info("waiting for results..");
+            Thread.sleep(1000);
+            results = kinesisClient.readAllMessages(OUTPUT_STREAM);
+        }
+        log.info("results: {}", results);
+
+        Assert.assertEquals(
+                "Results received from '" + OUTPUT_STREAM + "': " + results,
+                messages.size(),
+                results.size());
+
+        List<GenericRecord> expectedResults = getRecords();
+        for (Object expectedResult : expectedResults) {
+            Assert.assertTrue(results.contains(expectedResult));
+        }
+    }
+
+    private void executeFlinkForAvroSpecificFormat() throws Exception {
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        Properties properties = KINESALITE.getContainerProperties();
+        properties.setProperty(
+                STREAM_INITIAL_POSITION,
+                ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name());
+        FlinkKinesisConsumer<GenericRecord> consumer =
+                new FlinkKinesisConsumer<>(
+                        INPUT_STREAM,
+                        GlueSchemaRegistryAvroDeserializationSchema.forGeneric(
+                                getSchema(), getConfigs()),
+                        properties);
+
+        Properties producerProperties = new Properties(properties);
+        // producer needs region even when URL is specified
+        producerProperties.put(ConsumerConfigConstants.AWS_REGION, 
"ca-central-1");
+        // test driver does not deaggregate
+        producerProperties.put("AggregationEnabled", String.valueOf(false));
+

Review comment:
       nit: It would be more readable to split this method up into smaller 
methods, since it is quite large. For instance, this whole method could be 
simplified to something like this if you move source and sink creation:
   
   ```
   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   env.setParallelism(1);
   
   DataStream<GenericRecord> input = env.addSource(createSource());
   input.addSink(createSink());
   env.executeAsync();
   
   ```

##########
File path: 
flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/main/java/org/apache/flink/glue/schema/registry/test/json/specific/GlueSchemaRegistryPojoKinesisExample.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.glue.schema.registry.test.json.specific;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import 
org.apache.flink.formats.json.glue.schema.registry.GlueSchemaRegistryJsonDeserializationSchema;
+import 
org.apache.flink.formats.json.glue.schema.registry.GlueSchemaRegistryJsonSerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A simple example that shows how to read from and write to Kinesis. This 
will read JSON messages
+ * from the input stream, and finally write back to another stream.
+ *
+ * <p>Example usage: --input-stream test-input --output-stream test-output 
--aws.endpoint
+ * https://localhost:4567 --flink.stream.initpos TRIM_HORIZON
+ */
+public class GlueSchemaRegistryPojoKinesisExample {
+
+    public static void main(String[] args) throws Exception {

Review comment:
       Where/how is this class used? Is it originally used by the `sh` e2e 
tests? Can it be removed?

##########
File path: 
flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializer.java
##########
@@ -36,23 +36,24 @@
  * from it and remove schema registry information in the input stream.
  */
 public class GlueSchemaRegistryInputStreamDeserializer {

Review comment:
       This class does not have an annotation to indicate its stability. Please 
[annotate](https://github.com/apache/flink/tree/master/flink-annotations/src/main/java/org/apache/flink/annotation)
 using `PublicEvolving/Public/Internal`. This change looks backwards 
incompatible, so I would favour `PublicEvolving` if this is a public class




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to