Hiya, I’m late to this thread, and was just referred to the KIP from the
confluent mailing list.  Nice idea!

I like the idea of being able to provide a Schema fallback for
JsonConverter.  Perhaps an interface that could return the schema given the
JsonNode?    For my use case, I have JSONSchemas resolvable via a
schema_uri field in my JSON data.  I’d like to be able to implement an
interface that would look up the JSONSchema for the given message, and then
convert that JSONSchema to a Connect Schema.

Then, instead of inferSchema being a simple method that is either enabled
or not, we can set a schema.resolver.type.  Pseudocode E.g.

interface JsonSchemaResolver {
  public Schema apply(JsonNode value);
}

class InferSchemaResolver implements JsonSchemaResolver {
   public Schema apply(JsonNode value) {
      return inferSchema(value);
   }
}

Or for me and my custom resolver…

class JsonSchemaURIResolver implements JsonSchemaResolver {
  public JsonSchemaURIResolver(schemaUriPath) {
  // …
  }

  public Schema apply(JsonNode value) {
    schemaURI = value.at(schemaUriPath);
    // cache JSON schema here.
    return asConnectSchema(getJsonSchemaAtURI(schemaURI));
  }

  public Schema asConnectSchema(JsonNode schema) {
    // Recursively traverse schema and pull convert “type” fields to
Connect Schema types.
  }
}

And in config:
# perhaps the default:
schema.resolver.type=org.apache.kafka.connect.json.InferSchemaResolver
# Or for me and my custom resolver…
schema.resolver.type=org.apache.kafka.connect.json.JsonSchemaURIResolver


If we were really fancy, I suppose the Resolver interface wouldn't have to
be JsonNode specific.  Then any Converter could be configured with a schema
resolver implementation that returned a Schema from an Object value.

(I believe this is similar to how Confluent’s AvroConverter works, but that
is Confluent Schema Registry specific.)

Anyway, just a thought.

- Andrew Otto
  Senior Systems Engineer
  Wikimedia Foundation

Reply via email to