Repository: nifi
Updated Branches:
  refs/heads/master 4687e0fb3 -> 109109313


NIFI-4777: get schema by id even if not latest

This closes #2405.

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/10910931
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/10910931
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/10910931

Branch: refs/heads/master
Commit: 1091093133fdc3b9b970e23787b7bfa1dafdbc11
Parents: 4687e0f
Author: Internet <[email protected]>
Authored: Mon Jan 15 14:34:41 2018 +0200
Committer: Mark Payne <[email protected]>
Committed: Mon Mar 19 09:57:38 2018 -0400

----------------------------------------------------------------------
 .../client/RestSchemaRegistryClient.java        | 90 +++++++++++++-------
 1 file changed, 57 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/10910931/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 14e3e83..31d9801 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -33,6 +33,7 @@ import org.glassfish.jersey.client.ClientProperties;
 
 import javax.net.ssl.SSLContext;
 import javax.ws.rs.client.Client;
+import javax.ws.rs.client.Entity;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -41,8 +42,7 @@ import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+
 
 /**
  * <p>
@@ -64,9 +64,8 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
     private static final String VERSION_FIELD_NAME = "version";
     private static final String ID_FIELD_NAME = "id";
     private static final String SCHEMA_TEXT_FIELD_NAME = "schema";
-
-    private final ConcurrentMap<String, Integer> schemaNameToIdentifierMap = 
new ConcurrentHashMap<>();
-    private final ConcurrentMap<Integer, String> schemaIdentifierToNameMap = 
new ConcurrentHashMap<>();
+    private static final String CONTENT_TYPE_HEADER = "Content-Type";
+    private static final String SCHEMA_REGISTRY_CONTENT_TYPE = 
"application/vnd.schemaregistry.v1+json";
 
 
     public RestSchemaRegistryClient(final List<String> baseUrls, final int 
timeoutMillis, final SSLContext sslContext, final ComponentLog logger) {
@@ -100,41 +99,30 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
         // To make this more efficient, we will cache a mapping of Schema Name 
to identifier, so that we can look this up more efficiently.
 
         // Check if we have cached the Identifier to Name mapping
-        final String schemaName = schemaIdentifierToNameMap.get(schemaId);
-        if (schemaName != null) {
-            return getSchema(schemaName);
-        }
-
-        final String schemaDescription = "identifier " + schemaId;
-        final JsonNode schemaNameArray = fetchJsonResponse("/subjects", 
schemaDescription);
-        if (!schemaNameArray.isArray()) {
-            throw new IOException("When determining Subjects that are 
available, expected a JSON Array but did not receive a valid response");
-        }
 
-        final ArrayNode arrayNode = (ArrayNode) schemaNameArray;
-        for (final JsonNode node : arrayNode) {
-            final String nodeName = node.asText();
+        final String schemaPath = getSchemaPath(schemaId);
+        final JsonNode responseJson = fetchJsonResponse(schemaPath, "id " + 
schemaId);
+        final JsonNode subjectsJson = fetchJsonResponse("/subjects", "subjects 
array");
+        final ArrayNode subjectsList = (ArrayNode) subjectsJson;
 
-            final String schemaPath = getSubjectPath(nodeName);
-            final JsonNode schemaNode;
+        JsonNode completeSchema = null;
+        for (JsonNode subject: subjectsList) {
             try {
-                schemaNode = fetchJsonResponse(schemaPath, schemaDescription);
-            } catch (final SchemaNotFoundException | IOException e) {
-                logger.warn("Failed to fetch Schema with name '{}' from 
Confluent Schema Registry; "
-                    + "will skip this schema and continue attempting to 
retrieve other schemas", new Object[] {nodeName, e});
+                final String subjectName = subject.asText();
+                completeSchema = postJsonResponse("/subjects/" + subjectName, 
responseJson, "schema id: " + schemaId);
+                break;
+            } catch (SchemaNotFoundException e) {
                 continue;
             }
 
-            final int id = schemaNode.get(ID_FIELD_NAME).asInt();
-            schemaNameToIdentifierMap.put(nodeName, id);
-            schemaIdentifierToNameMap.put(id, nodeName);
+        }
 
-            if (id == schemaId) {
-                return createRecordSchema(schemaNode);
-            }
+        if(completeSchema == null) {
+            throw new SchemaNotFoundException("could not get schema with id: " 
+ schemaId);
         }
 
-        throw new SchemaNotFoundException("Could not find a schema with 
identifier " + schemaId);
+        final RecordSchema recordSchema = createRecordSchema(completeSchema);
+        return recordSchema;
     }
 
     private RecordSchema createRecordSchema(final JsonNode schemaNode) throws 
SchemaNotFoundException {
@@ -159,11 +147,39 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
         return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") + 
"/versions/latest";
     }
 
+    private String getSchemaPath(final int schemaId) throws 
UnsupportedEncodingException {
+        return "/schemas/ids/" + URLEncoder.encode(String.valueOf(schemaId), 
"UTF-8");
+    }
+
+    private JsonNode postJsonResponse(final String pathSuffix, final JsonNode 
schema, final String schemaDescription) throws SchemaNotFoundException {
+        String errorMessage = null;
+        for(final String baseUrl: baseUrls) {
+            final String path = getPath(pathSuffix);
+            final String trimmedBase = getTrimmedBase(baseUrl);
+            final String url = trimmedBase + path;
+            final WebTarget builder = client.target(url);
+            final Response response = 
builder.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER,
 SCHEMA_REGISTRY_CONTENT_TYPE).post(Entity.json(schema.toString()));
+            final int responseCode = response.getStatus();
+
+            if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
+                continue;
+            }
+
+            if(responseCode == Response.Status.OK.getStatusCode()) {
+                final JsonNode responseJson = 
response.readEntity(JsonNode.class);
+                return responseJson;
+            }
+        }
+
+        throw new SchemaNotFoundException("Failed to retrieve Schema with " + 
schemaDescription + " from any of the Confluent Schema Registry URL's provided; 
failure response message: "
+                + errorMessage);
+    }
+
     private JsonNode fetchJsonResponse(final String pathSuffix, final String 
schemaDescription) throws SchemaNotFoundException, IOException {
         String errorMessage = null;
         for (final String baseUrl : baseUrls) {
-            final String path = pathSuffix.startsWith("/") ? pathSuffix : "/" 
+ pathSuffix;
-            final String trimmedBase = baseUrl.endsWith("/") ? 
baseUrl.substring(0, baseUrl.length() - 1) : baseUrl;
+            final String path = getPath(pathSuffix);
+            final String trimmedBase = getTrimmedBase(baseUrl);
             final String url = trimmedBase + path;
 
             final WebTarget webTarget = client.target(url);
@@ -187,4 +203,12 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
         throw new IOException("Failed to retrieve Schema with " + 
schemaDescription + " from any of the Confluent Schema Registry URL's provided; 
failure response message: " + errorMessage);
     }
 
+    private String getTrimmedBase(String baseUrl) {
+        return baseUrl.endsWith("/") ? baseUrl.substring(0, baseUrl.length() - 
1) : baseUrl;
+    }
+
+    private String getPath(String pathSuffix) {
+        return pathSuffix.startsWith("/") ? pathSuffix : "/" + pathSuffix;
+    }
+
 }

Reply via email to