[
https://issues.apache.org/jira/browse/NIFI-1763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060943#comment-16060943
]
ASF GitHub Bot commented on NIFI-1763:
--------------------------------------
Github user markap14 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1938#discussion_r123756588
--- Diff:
nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.confluent.schemaregistry.client;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.net.ssl.SSLContext;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaParseException;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.SchemaIdentifier;
+import org.apache.nifi.web.util.WebUtils;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.node.ArrayNode;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+
+
+/**
+ * <p>
+ * A Client for interacting with Confluent Schema Registry. We make use of
Jersey Client to interact with the
+ * Confluent Schema Registry REST API because the provided schema registry
client does not provide a way to
+ * use HTTPS for interacting with the schema registry (it assumes that
system properties will be used, instead of
+ * an SSLContext) and also does not allow configuration of (or use)
timeouts. As a result, if the Schema Registry
+ * crashed or was shut down, NiFi threads could be stuck indefinitely
until NiFi is restarted. To avoid this,
+ * we make use of Jersey Client and set timeouts appropriately.
+ * </p>
+ */
+public class RestSchemaRegistryClient implements SchemaRegistryClient {
+ private final List<String> baseUrls;
+ private final Client client;
+
+ private static final String SUBJECT_FIELD_NAME = "subject";
+ private static final String VERSION_FIELD_NAME = "version";
+ private static final String ID_FIELD_NAME = "id";
+ private static final String SCHEMA_TEXT_FIELD_NAME = "schema";
+
+ private final ConcurrentMap<String, Integer> schemaNameToIdentifierMap
= new ConcurrentHashMap<>();
+ private final ConcurrentMap<Integer, String> schemaIdentifierToNameMap
= new ConcurrentHashMap<>();
+
+
+ public RestSchemaRegistryClient(final List<String> baseUrls, final int
timeoutMillis, final SSLContext sslContext) {
+ this.baseUrls = new ArrayList<>(baseUrls);
+
+ final ClientConfig clientConfig = new DefaultClientConfig();
+
clientConfig.getProperties().put(ClientConfig.PROPERTY_CONNECT_TIMEOUT,
timeoutMillis);
+
clientConfig.getProperties().put(ClientConfig.PROPERTY_READ_TIMEOUT,
timeoutMillis);
+ client = WebUtils.createClient(clientConfig, sslContext);
+ }
+
+
+ @Override
+ public RecordSchema getSchema(final String schemaName) throws
IOException, SchemaNotFoundException {
+ final String pathSuffix = getSubjectPath(schemaName);
+ final JsonNode responseJson = fetchJsonResponse(pathSuffix, "name
" + schemaName);
+
+ final RecordSchema recordSchema = createRecordSchema(responseJson);
+ return recordSchema;
+ }
+
+
+ @Override
+ public RecordSchema getSchema(final int schemaId) throws IOException,
SchemaNotFoundException {
+ // The Confluent Schema Registry's REST API will not provide us
with the 'subject' (name) of a Schema given the ID.
--- End diff --
Agreed.
> Provide an integration with 'Schema registry for Kafka'
> -------------------------------------------------------
>
> Key: NIFI-1763
> URL: https://issues.apache.org/jira/browse/NIFI-1763
> Project: Apache NiFi
> Issue Type: Wish
> Components: Extensions
> Reporter: Joseph Witt
> Assignee: Mark Payne
> Priority: Minor
>
> Reported on a mailing list question on 13 April 2016
> https://github.com/confluentinc/schema-registry
> The registry itself is an ASLv2 licensed codebase. It offers a REST-based
> Web API for interaction. It would be good to support integration with it for
> users of Kafka so it would register schemas if needed when writing to Kafka
> and understand how to parse data based on the indicated schema when reading
> from Kafka.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)