dannycranmer commented on a change in pull request #16513:
URL: https://github.com/apache/flink/pull/16513#discussion_r673993655



##########
File path: flink-formats/flink-json-glue-schema-registry/pom.xml
##########
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-formats</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.14-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-json-glue-schema-registry</artifactId>
+       <name>Flink : Formats : JSON AWS Glue Schema Registry</name>
+       <packaging>jar</packaging>
+
+       <properties>
+               
<glue.schema.registry.version>1.1.1</glue.schema.registry.version>
+               <aws.sdkv2.version>2.16.92</aws.sdkv2.version>
+               <scala.binary.version>2.12</scala.binary.version>
+       </properties>
+
+       <repositories>
+               <repository>
+                       <id>jitpack.io</id>
+                       <url>https://jitpack.io</url>
+               </repository>
+       </repositories>

Review comment:
       Which dependencies are being pulled from this new repository? I need to 
verify it is ok to add this

##########
File path: flink-end-to-end-tests/run-nightly-tests.sh
##########
@@ -206,7 +206,8 @@ run_test "Walkthrough DataStream Scala nightly end-to-end 
test" "$END_TO_END_DIR
 run_test "Avro Confluent Schema Registry nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_confluent_schema_registry.sh"
 
 if [[ -n "$IT_CASE_GLUE_SCHEMA_ACCESS_KEY" ]] && [[ -n 
"$IT_CASE_GLUE_SCHEMA_SECRET_KEY" ]]; then
-  run_test "AWS Glue Schema Registry nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_glue_schema_registry.sh"
+  run_test "Avro AWS Glue Schema Registry nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_glue_schema_registry_avro.sh"
+  run_test "JSON AWS Glue Schema Registry nightly end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_glue_schema_registry_json.sh"

Review comment:
       We are not meant to be adding new `.sh` e2e tests. There is a new java 
framework to use instead. When we added the avro test we also [raised a 
Jira](https://issues.apache.org/jira/browse/FLINK-21391) to migrate to the e2e 
framework, since there was not an existing Kinesis example. Since then we have 
added an [e2e test for 
Kinesis](https://issues.apache.org/jira/browse/FLINK-20042). Can you please 
migrate this test to the Java framework? You can take inspiration from the 
Kinesis test

##########
File path: flink-formats/flink-json-glue-schema-registry/pom.xml
##########
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-formats</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.14-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-json-glue-schema-registry</artifactId>
+       <name>Flink : Formats : JSON AWS Glue Schema Registry</name>
+       <packaging>jar</packaging>
+
+       <properties>
+               
<glue.schema.registry.version>1.1.1</glue.schema.registry.version>
+               <aws.sdkv2.version>2.16.92</aws.sdkv2.version>
+               <scala.binary.version>2.12</scala.binary.version>
+       </properties>
+
+       <repositories>
+               <repository>
+                       <id>jitpack.io</id>
+                       <url>https://jitpack.io</url>
+               </repository>
+       </repositories>
+
+       <dependencies>
+               <!-- core dependencies -->
+
+               <dependency>

Review comment:
       Why are these Scala dependencies required? 

##########
File path: flink-formats/flink-json-glue-schema-registry/pom.xml
##########
@@ -0,0 +1,115 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-formats</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.14-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       <artifactId>flink-json-glue-schema-registry</artifactId>
+       <name>Flink : Formats : JSON AWS Glue Schema Registry</name>
+       <packaging>jar</packaging>
+
+       <properties>
+               
<glue.schema.registry.version>1.1.1</glue.schema.registry.version>

Review comment:
       I enquired about this on the last PR for `avro` support and was told it 
would be addressed. The GSR dependency tree seems to pull in unnecessary 
dependencies, for example `kafka-client`. Is there a plan to trim down the 
dependency footprint here to a minimal set?
   
   ```
   mvn dependency:tree -Dverbose
   
   ...
   [INFO] +- software.amazon.glue:schema-registry-serde:jar:1.1.1:compile
   ...
   [INFO] |  +- org.apache.kafka:kafka-clients:jar:2.5.0:compile
   ```

##########
File path: 
flink-formats/flink-avro-glue-schema-registry/src/main/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryOutputStreamSerializer.java
##########
@@ -72,17 +72,17 @@ public void registerSchemaAndSerializeStream(Schema schema, 
OutputStream out, by
                 glueSchemaRegistrySerializationFacade.encode(
                         transportName,
                         new 
com.amazonaws.services.schemaregistry.common.Schema(
-                                schema.toString(), "Avro", getSchemaName()),
+                                schema.toString(), "AVRO", getSchemaName()),

Review comment:
       Is there not a constant? `DataFormat.AVRO` maybe?

##########
File path: 
flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/main/java/org/apache/flink/glue/schema/registry/test/json/GSRKinesisPubsubClient.java
##########
@@ -0,0 +1,107 @@
+/*

Review comment:
       Seems to be duplicated with the class in the `avro` module. Is there a 
way we can reuse rather than copy and paste?

##########
File path: 
flink-end-to-end-tests/flink-glue-schema-registry-json-test/src/main/java/org/apache/flink/glue/schema/registry/test/json/GSRKinesisPubsubClient.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.glue.schema.registry.test.json;
+
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
+
+import com.amazonaws.services.schemaregistry.common.AWSDeserializerInput;
+import com.amazonaws.services.schemaregistry.common.AWSSerializerInput;
+import 
com.amazonaws.services.schemaregistry.common.configs.GlueSchemaRegistryConfiguration;
+import 
com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializationFacade;
+import 
com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade;
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.services.glue.model.DataFormat;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Simple client to publish and retrieve messages, using the AWS Kinesis SDK, 
Flink Kinesis
+ * Connectors and Glue Schema Registry classes.
+ */
+public class GSRKinesisPubsubClient {
+    private final KinesisPubsubClient client;
+
+    public GSRKinesisPubsubClient(Properties properties) {
+        this.client = new KinesisPubsubClient(properties);
+    }
+
+    public void sendMessage(String schema, String streamName, Object msg) {
+        UUID schemaVersionId =
+                createSerializationFacade()
+                        .getOrRegisterSchemaVersion(
+                                AWSSerializerInput.builder()
+                                        .schemaDefinition(schema)
+                                        .dataFormat(DataFormat.JSON.name())
+                                        .schemaName(streamName)
+                                        .transportName(streamName)
+                                        .build());
+
+        client.sendMessage(
+                streamName,
+                createSerializationFacade().serialize(DataFormat.JSON, msg, 
schemaVersionId));
+    }
+
+    public List<Object> readAllMessages(String streamName) throws Exception {
+        return client.readAllMessages(
+                streamName,
+                bytes ->
+                        createDeserializationFacade()
+                                .deserialize(
+                                        AWSDeserializerInput.builder()
+                                                .buffer(ByteBuffer.wrap(bytes))
+                                                .transportName(streamName)
+                                                .build()));
+    }
+
+    public void createStream(String stream, int shards, Properties props) 
throws Exception {
+        client.createTopic(stream, shards, props);
+    }
+
+    private Map<String, Object> getSerDeConfigs() {
+        Map<String, Object> configs = new HashMap();
+        configs.put(AWSSchemaRegistryConstants.AWS_REGION, "ca-central-1");
+        
configs.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
+        configs.put(
+                AWSSchemaRegistryConstants.AVRO_RECORD_TYPE,

Review comment:
       Why is this referencing `AVRO` in the `JSON` test package?

##########
File path: 
flink-formats/flink-avro-glue-schema-registry/src/test/java/org/apache/flink/formats/avro/glue/schema/registry/GlueSchemaRegistryInputStreamDeserializerTest.java
##########
@@ -205,12 +209,13 @@ public void 
testGetSchemaAndDeserializedStream_withWrongSchema_throwsException()
         MutableByteArrayInputStream mutableByteArrayInputStream = new 
MutableByteArrayInputStream();
         glueSchema =
                 new com.amazonaws.services.schemaregistry.common.Schema(
-                        schemaDefinition, "Avro", testTopic);
-        mockDeserializer =
-                new MockAWSDeserializer(
-                        new byte[0], glueSchema, 
AWSSchemaRegistryConstants.COMPRESSION.NONE);
+                        schemaDefinition, "AVRO", testTopic);

Review comment:
       `DataFormat.AVRO` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to