Repository: nifi
Updated Branches:
  refs/heads/master 73702004b -> 412b3fbbe


NIFI-4459: This closes #2240. Catch Exception if trying to iterate over many 
confluent schemas and unable to load one; in this case log a WARNING and 
continue on; also updated Jersey client to newest

Signed-off-by: joewitt <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/412b3fbb
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/412b3fbb
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/412b3fbb

Branch: refs/heads/master
Commit: 412b3fbbe2d43ad67e1e119586c507bd5258cccb
Parents: 7370200
Author: Mark Payne <[email protected]>
Authored: Mon Oct 30 11:59:55 2017 -0400
Committer: joewitt <[email protected]>
Committed: Tue Nov 21 12:52:05 2017 -0500

----------------------------------------------------------------------
 .../pom.xml                                     | 10 ++++++--
 .../schemaregistry/ConfluentSchemaRegistry.java |  2 +-
 .../client/RestSchemaRegistryClient.java        | 27 +++++++++++++++-----
 3 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/412b3fbb/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml
index 69f09b3..f762393 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/pom.xml
@@ -9,7 +9,8 @@
     License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
CONDITIONS 
     OF ANY KIND, either express or implied. See the License for the specific 
     language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
 
     <parent>
@@ -49,10 +50,15 @@
             <version>${jersey.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.glassfish.jersey.inject</groupId>
+            <artifactId>jersey-hk2</artifactId>
+            <version>${jersey.version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-web-utils</artifactId>
         </dependency>
-        
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>

http://git-wip-us.apache.org/repos/asf/nifi/blob/412b3fbb/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
index 070be75..113e096 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
@@ -137,7 +137,7 @@ public class ConfluentSchemaRegistry extends 
AbstractControllerService implement
             sslContext = 
sslContextService.createSSLContext(ClientAuth.REQUIRED);
         }
 
-        final SchemaRegistryClient restClient = new 
RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext);
+        final SchemaRegistryClient restClient = new 
RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, getLogger());
 
         final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
         final long cacheExpiration = 
context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue();

http://git-wip-us.apache.org/repos/asf/nifi/blob/412b3fbb/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 76dd43a..b2ad19b 100644
--- 
a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ 
b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -20,12 +20,14 @@ package org.apache.nifi.confluent.schemaregistry.client;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
 import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.apache.nifi.web.util.WebUtils;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.node.ArrayNode;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
 import org.glassfish.jersey.client.ClientConfig;
 import org.glassfish.jersey.client.ClientProperties;
 
@@ -53,8 +55,10 @@ import java.util.concurrent.ConcurrentMap;
  * </p>
  */
 public class RestSchemaRegistryClient implements SchemaRegistryClient {
+
     private final List<String> baseUrls;
     private final Client client;
+    private final ComponentLog logger;
 
     private static final String SUBJECT_FIELD_NAME = "subject";
     private static final String VERSION_FIELD_NAME = "version";
@@ -65,13 +69,15 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
     private final ConcurrentMap<Integer, String> schemaIdentifierToNameMap = 
new ConcurrentHashMap<>();
 
 
-    public RestSchemaRegistryClient(final List<String> baseUrls, final int 
timeoutMillis, final SSLContext sslContext) {
+    public RestSchemaRegistryClient(final List<String> baseUrls, final int 
timeoutMillis, final SSLContext sslContext, final ComponentLog logger) {
         this.baseUrls = new ArrayList<>(baseUrls);
 
         final ClientConfig clientConfig = new ClientConfig();
         clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMillis);
         clientConfig.property(ClientProperties.READ_TIMEOUT, timeoutMillis);
         client = WebUtils.createClient(clientConfig, sslContext);
+
+        this.logger = logger;
     }
 
 
@@ -107,10 +113,17 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
 
         final ArrayNode arrayNode = (ArrayNode) schemaNameArray;
         for (final JsonNode node : arrayNode) {
-            final String nodeName = node.getTextValue();
+            final String nodeName = node.asText();
 
             final String schemaPath = getSubjectPath(nodeName);
-            final JsonNode schemaNode = fetchJsonResponse(schemaPath, 
schemaDescription);
+            final JsonNode schemaNode;
+            try {
+                schemaNode = fetchJsonResponse(schemaPath, schemaDescription);
+            } catch (final SchemaNotFoundException | IOException e) {
+                logger.warn("Failed to fetch Schema with name '{}' from 
Confluent Schema Registry; "
+                    + "will skip this schema and continue attempting to 
retrieve other schemas", new Object[] {nodeName, e});
+                continue;
+            }
 
             final int id = schemaNode.get(ID_FIELD_NAME).asInt();
             schemaNameToIdentifierMap.put(nodeName, id);
@@ -125,10 +138,10 @@ public class RestSchemaRegistryClient implements 
SchemaRegistryClient {
     }
 
     private RecordSchema createRecordSchema(final JsonNode schemaNode) throws 
SchemaNotFoundException {
-        final String subject = 
schemaNode.get(SUBJECT_FIELD_NAME).getTextValue();
+        final String subject = schemaNode.get(SUBJECT_FIELD_NAME).asText();
         final int version = schemaNode.get(VERSION_FIELD_NAME).asInt();
         final int id = schemaNode.get(ID_FIELD_NAME).asInt();
-        final String schemaText = 
schemaNode.get(SCHEMA_TEXT_FIELD_NAME).getTextValue();
+        final String schemaText = 
schemaNode.get(SCHEMA_TEXT_FIELD_NAME).asText();
 
         try {
             final Schema avroSchema = new Schema.Parser().parse(schemaText);

Reply via email to