This is an automated email from the ASF dual-hosted git repository.

mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 060f4fe  NIFI-6011 support for retrieving named schema versions
060f4fe is described below

commit 060f4fe73f087b9de103905ee3d8dd1d1b2ee64c
Author: Alex Savitsky <[email protected]>
AuthorDate: Fri Feb 8 10:16:11 2019 -0500

    NIFI-6011 support for retrieving named schema versions
    
    This closes #3297
    
    Signed-off-by: Mike Thomsen <[email protected]>
---
 .../schemaregistry/ConfluentSchemaRegistry.java    |  8 +++++-
 .../client/CachingSchemaRegistryClient.java        | 18 +++++++++----
 .../client/RestSchemaRegistryClient.java           | 31 ++++++++++++----------
 .../client/SchemaRegistryClient.java               |  2 ++
 4 files changed, 39 insertions(+), 20 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
index 2bf3a61..3567392 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
@@ -185,7 +185,13 @@ public class ConfluentSchemaRegistry extends 
AbstractControllerService implement
             throw new 
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema 
because Schema Name is not present");
         }
 
-        final RecordSchema schema = client.getSchema(schemaName.get());
+        final RecordSchema schema;
+        if (schemaIdentifier.getVersion().isPresent()) {
+            schema = client.getSchema(schemaName.get(), 
schemaIdentifier.getVersion().getAsInt());
+        } else {
+            schema = client.getSchema(schemaName.get());
+        }
+
         return schema;
     }
 
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
index 9075ac2..365d500 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
@@ -19,17 +19,16 @@ package org.apache.nifi.confluent.schemaregistry.client;
 
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.nifi.serialization.record.RecordSchema;
 
-import java.io.IOException;
 import java.time.Duration;
 
-
 public class CachingSchemaRegistryClient implements SchemaRegistryClient {
     private final SchemaRegistryClient client;
 
     private final LoadingCache<String, RecordSchema> nameCache;
+    private final LoadingCache<Pair<String, Integer>, RecordSchema> 
nameVersionCache;
     private final LoadingCache<Integer, RecordSchema> idCache;
 
 
@@ -40,6 +39,10 @@ public class CachingSchemaRegistryClient implements 
SchemaRegistryClient {
                 .maximumSize(cacheSize)
                 .expireAfterWrite(Duration.ofNanos(expirationNanos))
                 .build(client::getSchema);
+        nameVersionCache = Caffeine.newBuilder()
+                .maximumSize(cacheSize)
+                .expireAfterWrite(Duration.ofNanos(expirationNanos))
+                .build(key -> client.getSchema(key.getLeft(), key.getRight()));
         idCache = Caffeine.newBuilder()
                 .maximumSize(cacheSize)
                 .expireAfterWrite(Duration.ofNanos(expirationNanos))
@@ -47,12 +50,17 @@ public class CachingSchemaRegistryClient implements 
SchemaRegistryClient {
     }
 
     @Override
-    public RecordSchema getSchema(final String schemaName) throws IOException, 
SchemaNotFoundException {
+    public RecordSchema getSchema(final String schemaName) {
         return nameCache.get(schemaName);
     }
 
     @Override
-    public RecordSchema getSchema(final int schemaId) throws IOException, 
SchemaNotFoundException {
+    public RecordSchema getSchema(String schemaName, int version) {
+        return nameVersionCache.get(Pair.of(schemaName, version));
+    }
+
+    @Override
+    public RecordSchema getSchema(final int schemaId) {
         return idCache.get(schemaId);
     }
 
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 31d9801..1bca29c 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -82,13 +82,19 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
 
     @Override
     public RecordSchema getSchema(final String schemaName) throws IOException, 
SchemaNotFoundException {
-        final String pathSuffix = getSubjectPath(schemaName);
+        final String pathSuffix = getSubjectPath(schemaName, null);
         final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + 
schemaName);
 
-        final RecordSchema recordSchema = createRecordSchema(responseJson);
-        return recordSchema;
+        return createRecordSchema(responseJson);
     }
 
+    @Override
+    public RecordSchema getSchema(final String schemaName, final int 
schemaVersion) throws IOException, SchemaNotFoundException {
+        final String pathSuffix = getSubjectPath(schemaName, schemaVersion);
+        final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " + 
schemaName);
+
+        return createRecordSchema(responseJson);
+    }
 
     @Override
     public RecordSchema getSchema(final int schemaId) throws IOException, 
SchemaNotFoundException {
@@ -121,8 +127,7 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
             throw new SchemaNotFoundException("could not get schema with id: " 
+ schemaId);
         }
 
-        final RecordSchema recordSchema = createRecordSchema(completeSchema);
-        return recordSchema;
+        return createRecordSchema(completeSchema);
     }
 
     private RecordSchema createRecordSchema(final JsonNode schemaNode) throws 
SchemaNotFoundException {
@@ -133,18 +138,18 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
 
         try {
             final Schema avroSchema = new Schema.Parser().parse(schemaText);
-            final SchemaIdentifier schemaId = 
SchemaIdentifier.builder().name(subject).id(Long.valueOf(id)).version(version).build();
+            final SchemaIdentifier schemaId = 
SchemaIdentifier.builder().name(subject).id((long) id).version(version).build();
 
-            final RecordSchema recordSchema = 
AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
-            return recordSchema;
+            return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
         } catch (final SchemaParseException spe) {
             throw new SchemaNotFoundException("Obtained Schema with id " + id 
+ " and name " + subject
                 + " from Confluent Schema Registry but the Schema Text that 
was returned is not a valid Avro Schema");
         }
     }
 
-    private String getSubjectPath(final String schemaName) throws 
UnsupportedEncodingException {
-        return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + 
"/versions/latest";
+    private String getSubjectPath(final String schemaName, final Integer 
schemaVersion) throws UnsupportedEncodingException {
+        return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + 
"/versions/" +
+                (schemaVersion == null ? "latest" : 
URLEncoder.encode(String.valueOf(schemaVersion), "UTF-8"));
     }
 
     private String getSchemaPath(final int schemaId) throws 
UnsupportedEncodingException {
@@ -166,8 +171,7 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
             }
 
             if(responseCode == Response.Status.OK.getStatusCode()) {
-                final JsonNode responseJson = 
response.readEntity(JsonNode.class);
-                return responseJson;
+                return response.readEntity(JsonNode.class);
             }
         }
 
@@ -187,8 +191,7 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
             final int responseCode = response.getStatus();
 
             if (responseCode == Response.Status.OK.getStatusCode()) {
-                final JsonNode responseJson = 
response.readEntity(JsonNode.class);
-                return responseJson;
+                return response.readEntity(JsonNode.class);
             }
 
             if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
index 3c8c0cb..c12c258 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
@@ -26,5 +26,7 @@ public interface SchemaRegistryClient {
 
     RecordSchema getSchema(String schemaName) throws IOException, 
SchemaNotFoundException;
 
+    RecordSchema getSchema(String schemaName, int version) throws IOException, 
SchemaNotFoundException;
+
     RecordSchema getSchema(int schemaId) throws IOException, 
SchemaNotFoundException;
 }

Reply via email to