This is an automated email from the ASF dual-hosted git repository.
mthomsen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 060f4fe NIFI-6011 support for retrieving named schema versions
060f4fe is described below
commit 060f4fe73f087b9de103905ee3d8dd1d1b2ee64c
Author: Alex Savitsky <[email protected]>
AuthorDate: Fri Feb 8 10:16:11 2019 -0500
NIFI-6011 support for retrieving named schema versions
This closes #3297
Signed-off-by: Mike Thomsen <[email protected]>
---
.../schemaregistry/ConfluentSchemaRegistry.java | 8 +++++-
.../client/CachingSchemaRegistryClient.java | 18 +++++++++----
.../client/RestSchemaRegistryClient.java | 31 ++++++++++++----------
.../client/SchemaRegistryClient.java | 2 ++
4 files changed, 39 insertions(+), 20 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
index 2bf3a61..3567392 100644
---
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
+++
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
@@ -185,7 +185,13 @@ public class ConfluentSchemaRegistry extends
AbstractControllerService implement
throw new
org.apache.nifi.schema.access.SchemaNotFoundException("Cannot retrieve schema
because Schema Name is not present");
}
- final RecordSchema schema = client.getSchema(schemaName.get());
+ final RecordSchema schema;
+ if (schemaIdentifier.getVersion().isPresent()) {
+ schema = client.getSchema(schemaName.get(),
schemaIdentifier.getVersion().getAsInt());
+ } else {
+ schema = client.getSchema(schemaName.get());
+ }
+
return schema;
}
diff --git
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
index 9075ac2..365d500 100644
---
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
+++
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/CachingSchemaRegistryClient.java
@@ -19,17 +19,16 @@ package org.apache.nifi.confluent.schemaregistry.client;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.serialization.record.RecordSchema;
-import java.io.IOException;
import java.time.Duration;
-
public class CachingSchemaRegistryClient implements SchemaRegistryClient {
private final SchemaRegistryClient client;
private final LoadingCache<String, RecordSchema> nameCache;
+ private final LoadingCache<Pair<String, Integer>, RecordSchema>
nameVersionCache;
private final LoadingCache<Integer, RecordSchema> idCache;
@@ -40,6 +39,10 @@ public class CachingSchemaRegistryClient implements
SchemaRegistryClient {
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
.build(client::getSchema);
+ nameVersionCache = Caffeine.newBuilder()
+ .maximumSize(cacheSize)
+ .expireAfterWrite(Duration.ofNanos(expirationNanos))
+ .build(key -> client.getSchema(key.getLeft(), key.getRight()));
idCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.expireAfterWrite(Duration.ofNanos(expirationNanos))
@@ -47,12 +50,17 @@ public class CachingSchemaRegistryClient implements
SchemaRegistryClient {
}
@Override
- public RecordSchema getSchema(final String schemaName) throws IOException,
SchemaNotFoundException {
+ public RecordSchema getSchema(final String schemaName) {
return nameCache.get(schemaName);
}
@Override
- public RecordSchema getSchema(final int schemaId) throws IOException,
SchemaNotFoundException {
+ public RecordSchema getSchema(String schemaName, int version) {
+ return nameVersionCache.get(Pair.of(schemaName, version));
+ }
+
+ @Override
+ public RecordSchema getSchema(final int schemaId) {
return idCache.get(schemaId);
}
diff --git
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 31d9801..1bca29c 100644
---
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -82,13 +82,19 @@ public class RestSchemaRegistryClient implements
SchemaRegistryClient {
@Override
public RecordSchema getSchema(final String schemaName) throws IOException,
SchemaNotFoundException {
- final String pathSuffix = getSubjectPath(schemaName);
+ final String pathSuffix = getSubjectPath(schemaName, null);
final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " +
schemaName);
- final RecordSchema recordSchema = createRecordSchema(responseJson);
- return recordSchema;
+ return createRecordSchema(responseJson);
}
+ @Override
+ public RecordSchema getSchema(final String schemaName, final int
schemaVersion) throws IOException, SchemaNotFoundException {
+ final String pathSuffix = getSubjectPath(schemaName, schemaVersion);
+ final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name " +
schemaName);
+
+ return createRecordSchema(responseJson);
+ }
@Override
public RecordSchema getSchema(final int schemaId) throws IOException,
SchemaNotFoundException {
@@ -121,8 +127,7 @@ public class RestSchemaRegistryClient implements
SchemaRegistryClient {
throw new SchemaNotFoundException("could not get schema with id: "
+ schemaId);
}
- final RecordSchema recordSchema = createRecordSchema(completeSchema);
- return recordSchema;
+ return createRecordSchema(completeSchema);
}
private RecordSchema createRecordSchema(final JsonNode schemaNode) throws
SchemaNotFoundException {
@@ -133,18 +138,18 @@ public class RestSchemaRegistryClient implements
SchemaRegistryClient {
try {
final Schema avroSchema = new Schema.Parser().parse(schemaText);
- final SchemaIdentifier schemaId =
SchemaIdentifier.builder().name(subject).id(Long.valueOf(id)).version(version).build();
+ final SchemaIdentifier schemaId =
SchemaIdentifier.builder().name(subject).id((long) id).version(version).build();
- final RecordSchema recordSchema =
AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
- return recordSchema;
+ return AvroTypeUtil.createSchema(avroSchema, schemaText, schemaId);
} catch (final SchemaParseException spe) {
throw new SchemaNotFoundException("Obtained Schema with id " + id
+ " and name " + subject
+ " from Confluent Schema Registry but the Schema Text that
was returned is not a valid Avro Schema");
}
}
- private String getSubjectPath(final String schemaName) throws
UnsupportedEncodingException {
- return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") +
"/versions/latest";
+ private String getSubjectPath(final String schemaName, final Integer
schemaVersion) throws UnsupportedEncodingException {
+ return "/subjects/" + URLEncoder.encode(schemaName, "UTF-8") +
"/versions/" +
+ (schemaVersion == null ? "latest" :
URLEncoder.encode(String.valueOf(schemaVersion), "UTF-8"));
}
private String getSchemaPath(final int schemaId) throws
UnsupportedEncodingException {
@@ -166,8 +171,7 @@ public class RestSchemaRegistryClient implements
SchemaRegistryClient {
}
if(responseCode == Response.Status.OK.getStatusCode()) {
- final JsonNode responseJson =
response.readEntity(JsonNode.class);
- return responseJson;
+ return response.readEntity(JsonNode.class);
}
}
@@ -187,8 +191,7 @@ public class RestSchemaRegistryClient implements
SchemaRegistryClient {
final int responseCode = response.getStatus();
if (responseCode == Response.Status.OK.getStatusCode()) {
- final JsonNode responseJson =
response.readEntity(JsonNode.class);
- return responseJson;
+ return response.readEntity(JsonNode.class);
}
if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
diff --git
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
index 3c8c0cb..c12c258 100644
---
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
+++
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/SchemaRegistryClient.java
@@ -26,5 +26,7 @@ public interface SchemaRegistryClient {
RecordSchema getSchema(String schemaName) throws IOException,
SchemaNotFoundException;
+ RecordSchema getSchema(String schemaName, int version) throws IOException,
SchemaNotFoundException;
+
RecordSchema getSchema(int schemaId) throws IOException,
SchemaNotFoundException;
}