awelless commented on code in PR #10062:
URL: https://github.com/apache/nifi/pull/10062#discussion_r2180275134


##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-schema-registry-service/src/main/java/org/apache/nifi/aws/schemaregistry/WireFormatSchemaVersionIdUtil.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.aws.schemaregistry;
+
+import java.util.Optional;
+import java.util.UUID;
+
+public class WireFormatSchemaVersionIdUtil {

Review Comment:
   Nit: instead of using a static util class we can create a specific 
`WireFormatAwsGlueSchemaId`.
   E.g.
   ```java
   record WireFormatAwsGlueSchemaId(UUID id) {
     String toSchemaName() {
        return prefix + id;   
     }
     
     static Optional<WireFormatAwsGlueSchemaId> fromSchemaName(String name) {
       ...
     } 
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-schema-registry-service/src/main/java/org/apache/nifi/aws/schemaregistry/client/CachingSchemaRegistryClient.java:
##########
@@ -23,11 +23,13 @@
 import org.apache.nifi.serialization.record.RecordSchema;
 
 import java.time.Duration;
+import java.util.UUID;
 
 public class CachingSchemaRegistryClient implements SchemaRegistryClient {
     private final SchemaRegistryClient client;
     private final LoadingCache<String, RecordSchema> nameCache;
     private final LoadingCache<Pair<String, Long>, RecordSchema> 
nameVersionCache;
+    private final LoadingCache<UUID, RecordSchema> nameVersionIdCache;

Review Comment:
   Nit: shouldn't it be `schemaVersionIdCache`, as in the `getSchema` method?



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-schema-registry-service/src/main/java/org/apache/nifi/aws/schemaregistry/AmazonGlueEncodedSchemaReferenceReader.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.aws.schemaregistry;
+
+import 
com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryDeserializerDataParser;
+import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaReferenceReader;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.stream.io.StreamUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+@Tags({"schema", "registry", "aws", "avro", "glue"})
+@CapabilityDescription("Reads Schema Identifier according to AWS Glue Schema 
encoding as a header consisting of a two byte markers and a 16 byte UUID")
+public class AmazonGlueEncodedSchemaReferenceReader extends 
AbstractControllerService implements SchemaReferenceReader {
+
+    private final GlueSchemaRegistryDeserializerDataParser 
deserializerDataParser = GlueSchemaRegistryDeserializerDataParser.getInstance();
+
+    private static final int HEADER_CAPACITY = 
AWSSchemaRegistryConstants.HEADER_VERSION_BYTE_SIZE
+                    + AWSSchemaRegistryConstants.COMPRESSION_BYTE_SIZE
+                    + AWSSchemaRegistryConstants.SCHEMA_VERSION_ID_SIZE;
+
+    private static final Set<SchemaField> SUPPLIED_SCHEMA_FIELDS = 
Set.of(SchemaField.SCHEMA_BRANCH_NAME);

Review Comment:
   ```suggestion
       private static final Set<SchemaField> SUPPLIED_SCHEMA_FIELDS = 
Set.of(SchemaField.SCHEMA_NAME);
   ```
   It should be schema name.



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-schema-registry-service/src/test/java/org/apache/nifi/aws/schemaregistry/client/GlueSchemaRegistryClientTest.java:
##########
@@ -83,4 +87,28 @@ void 
getSchemaWithNameAndVersionInvokesClientAndReturnsRecordSchema() throws IOE
         assertEquals(EXPECTED_SCHEMA_NAME, 
actualSchema.getSchemaName().orElseThrow(() -> new RuntimeException("Schema 
name not found")));
         
verify(mockClient).getSchemaVersion(any(GetSchemaVersionRequest.class));
     }
+
+    @Test
+    void getSchemaWithNameVersionIdInvokesClientAndReturnsRecordSchema() 
throws IOException, SchemaNotFoundException {
+        UUID schemaVersionId = UUID.randomUUID();
+        String schemaArn = 
"arn:aws:glue:us-east-1:123456789012:schema/registry/name";
+
+        final GetSchemaVersionResponse mockResponse = 
GetSchemaVersionResponse.builder()
+                .dataFormat(DataFormat.AVRO)
+                .schemaDefinition(SCHEMA_DEFINITION)
+                .versionNumber(1L)
+                .schemaArn(schemaArn)
+                .build();
+
+        schemaRegistryClient = new GlueSchemaRegistryClient(mockClient, 
REGISTRY_NAME);
+
+        
when(mockClient.getSchemaVersion(any(GetSchemaVersionRequest.class))).thenReturn(mockResponse);

Review Comment:
   Nit: we can verify that expected `schemaVersionId` was passed. (verbosity 
alert!)
   ```suggestion
           when(mockClient.getSchemaVersion(argThat((GetSchemaVersionRequest 
req) -> 
schemaVersionId.toString().equals(req.schemaVersionId()))).thenReturn(mockResponse);
   
   ```



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-schema-registry-service/src/main/java/org/apache/nifi/aws/schemaregistry/WireFormatSchemaVersionIdUtil.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.aws.schemaregistry;
+
+import java.util.Optional;
+import java.util.UUID;
+
+public class WireFormatSchemaVersionIdUtil {

Review Comment:
   Also, it seems this class can be package-private



##########
nifi-extension-bundles/nifi-aws-bundle/nifi-aws-schema-registry-service/src/main/java/org/apache/nifi/aws/schemaregistry/client/GlueSchemaRegistryClient.java:
##########
@@ -1,109 +1,149 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.aws.schemaregistry.client;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaParseException;
-import org.apache.nifi.avro.AvroTypeUtil;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.SchemaIdentifier;
-import software.amazon.awssdk.services.glue.GlueClient;
-import software.amazon.awssdk.services.glue.model.GetSchemaVersionRequest;
-import software.amazon.awssdk.services.glue.model.GetSchemaVersionResponse;
-import software.amazon.awssdk.services.glue.model.SchemaId;
-import software.amazon.awssdk.services.glue.model.SchemaVersionNumber;
-
-import java.io.IOException;
-
-public class GlueSchemaRegistryClient implements SchemaRegistryClient {
-
-    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
-    private static final String NAMESPACE_FIELD_NAME = "namespace";
-
-    private final GlueClient client;
-    private final String registryName;
-
-    public GlueSchemaRegistryClient(final GlueClient client, final String 
registryName) {
-        this.client = client;
-        this.registryName = registryName;
-    }
-
-    @Override
-    public RecordSchema getSchema(final String schemaName) throws IOException, 
SchemaNotFoundException {
-        final SchemaVersionNumber schemaVersionNumber = 
SchemaVersionNumber.builder()
-                .latestVersion(true)
-                .build();
-
-        final GetSchemaVersionResponse schemaVersionResponse = 
getSchemaVersionResponse(schemaName, schemaVersionNumber);
-
-        return createRecordSchema(schemaVersionResponse);
-    }
-
-    @Override
-    public RecordSchema getSchema(final String schemaName, final long version) 
throws IOException, SchemaNotFoundException {
-        final SchemaVersionNumber schemaVersionNumber = 
SchemaVersionNumber.builder()
-                .versionNumber(version)
-                .build();
-
-        final GetSchemaVersionResponse schemaVersionResponse = 
getSchemaVersionResponse(schemaName, schemaVersionNumber);
-
-        return createRecordSchema(schemaVersionResponse);
-    }
-
-    private GetSchemaVersionResponse getSchemaVersionResponse(final String 
schemaName, final SchemaVersionNumber schemaVersionNumber) {
-        final SchemaId schemaId = buildSchemaId(schemaName);
-        final GetSchemaVersionRequest request = 
buildSchemaVersionRequest(schemaVersionNumber, schemaId);
-        return client.getSchemaVersion(request);
-    }
-
-    private GetSchemaVersionRequest buildSchemaVersionRequest(final 
SchemaVersionNumber schemaVersionNumber, final SchemaId schemaId) {
-        return GetSchemaVersionRequest.builder()
-                .schemaVersionNumber(schemaVersionNumber)
-                .schemaId(schemaId)
-                .build();
-    }
-
-    private SchemaId buildSchemaId(final String schemaName) {
-        return SchemaId.builder()
-                .registryName(registryName)
-                .schemaName(schemaName)
-                .build();
-    }
-
-    private RecordSchema createRecordSchema(final GetSchemaVersionResponse 
schemaVersionResponse) throws SchemaNotFoundException, JsonProcessingException {
-        final JsonNode schemaNode = 
OBJECT_MAPPER.readTree(schemaVersionResponse.schemaDefinition());
-        final String namespace = schemaNode.get(NAMESPACE_FIELD_NAME).asText();
-        final int version = schemaVersionResponse.versionNumber().intValue();
-        final String schemaText = schemaVersionResponse.schemaDefinition();
-
-        try {
-            final Schema avroSchema = new Schema.Parser().parse(schemaText);
-            final SchemaIdentifier schemaId = SchemaIdentifier.builder()
-                    .name(namespace)
-                    .version(version)
-                    .build();
-            return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
-        } catch (final SchemaParseException spe) {
-            throw new SchemaNotFoundException("Obtained Schema with name " + 
namespace
-                    + " from Glue Schema Registry but the Schema Text that was 
returned is not a valid Avro Schema");
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.aws.schemaregistry.client;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.glue.model.DataFormat;
+import software.amazon.awssdk.services.glue.model.GetSchemaVersionRequest;
+import software.amazon.awssdk.services.glue.model.GetSchemaVersionResponse;
+import software.amazon.awssdk.services.glue.model.SchemaId;
+import software.amazon.awssdk.services.glue.model.SchemaVersionNumber;
+
+import java.io.IOException;
+import java.util.UUID;
+
+public class GlueSchemaRegistryClient implements SchemaRegistryClient {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private static final String NAMESPACE_FIELD_NAME = "namespace";
+
+    private final GlueClient client;
+    private final String registryName;
+
+    public GlueSchemaRegistryClient(final GlueClient client, final String 
registryName) {
+        this.client = client;
+        this.registryName = registryName;
+    }
+
+    @Override
+    public RecordSchema getSchema(final String schemaName) throws IOException, 
SchemaNotFoundException {
+        final SchemaVersionNumber schemaVersionNumber = 
SchemaVersionNumber.builder()
+                .latestVersion(true)
+                .build();
+
+        final GetSchemaVersionResponse schemaVersionResponse = 
getSchemaVersionResponse(schemaName, schemaVersionNumber);
+
+        return createRecordSchema(schemaVersionResponse);
+    }
+
+    @Override
+    public RecordSchema getSchema(final String schemaName, final long version) 
throws IOException, SchemaNotFoundException {
+        final SchemaVersionNumber schemaVersionNumber = 
SchemaVersionNumber.builder()
+                .versionNumber(version)
+                .build();
+
+        final GetSchemaVersionResponse schemaVersionResponse = 
getSchemaVersionResponse(schemaName, schemaVersionNumber);
+
+        return createRecordSchema(schemaVersionResponse);
+    }
+
+    @Override
+    public RecordSchema getSchema(UUID schemaVersionId) throws 
SchemaNotFoundException {
+        final GetSchemaVersionResponse schemaVersionResponse = 
client.getSchemaVersion(
+                GetSchemaVersionRequest.builder()
+                        .schemaVersionId(schemaVersionId.toString())
+                        .build()
+        );
+
+        return createRecordSchemaFromSchemaVersionId(schemaVersionResponse);
+    }
+
+    private GetSchemaVersionResponse getSchemaVersionResponse(final String 
schemaName, final SchemaVersionNumber schemaVersionNumber) {
+        final SchemaId schemaId = buildSchemaId(schemaName);
+        final GetSchemaVersionRequest request = 
buildSchemaVersionRequest(schemaVersionNumber, schemaId);
+        return client.getSchemaVersion(request);
+    }
+
+    private GetSchemaVersionRequest buildSchemaVersionRequest(final 
SchemaVersionNumber schemaVersionNumber, final SchemaId schemaId) {
+        return GetSchemaVersionRequest.builder()
+                .schemaVersionNumber(schemaVersionNumber)
+                .schemaId(schemaId)
+                .build();
+    }
+
+    private SchemaId buildSchemaId(final String schemaName) {
+        return SchemaId.builder()
+                .registryName(registryName)
+                .schemaName(schemaName)
+                .build();
+    }
+
+    private RecordSchema createRecordSchema(final GetSchemaVersionResponse 
schemaVersionResponse) throws SchemaNotFoundException, JsonProcessingException {
+        final JsonNode schemaNode = 
OBJECT_MAPPER.readTree(schemaVersionResponse.schemaDefinition());
+        final String namespace = schemaNode.get(NAMESPACE_FIELD_NAME).asText();
+        final int version = schemaVersionResponse.versionNumber().intValue();
+        final String schemaText = schemaVersionResponse.schemaDefinition();
+
+        try {
+            final SchemaIdentifier schemaId = SchemaIdentifier.builder()
+                    .name(namespace)
+                    .version(version)
+                    .build();
+            return parseSchema(schemaVersionResponse.dataFormat(), schemaText, 
schemaId);
+        } catch (final SchemaParseException spe) {
+            throw new SchemaNotFoundException("Obtained Schema with name " + 
namespace
+                    + " from Glue Schema Registry but the Schema Text that was 
returned is not a valid Avro Schema");
+        }
+    }
+
+    private RecordSchema createRecordSchemaFromSchemaVersionId(final 
GetSchemaVersionResponse schemaVersionResponse) throws SchemaNotFoundException {
+        final int schemaVersionId = 
schemaVersionResponse.versionNumber().intValue();
+
+        try {
+            final SchemaIdentifier schemaId = SchemaIdentifier.builder()
+                    .name(schemaVersionResponse.schemaArn())
+                    .version(schemaVersionId)

Review Comment:
   Why are these fields differrent from the ones in `createRecordSchema`? 
   In both cases we accept `createRecordSchema`. Do the field values differ 
depending on how a schema is retrieved?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to