http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
----------------------------------------------------------------------
diff --git 
a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java 
b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
deleted file mode 100644
index ca8f029..0000000
--- 
a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java
+++ /dev/null
@@ -1,735 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.json;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.kafka.common.cache.Cache;
-import org.apache.kafka.common.cache.LRUCache;
-import org.apache.kafka.common.cache.SynchronizedCache;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.copycat.data.*;
-import org.apache.kafka.copycat.errors.DataException;
-import org.apache.kafka.copycat.storage.Converter;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * Implementation of Converter that uses JSON to store schemas and objects.
- */
-public class JsonConverter implements Converter {
-    private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
-    private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
-    private static final String SCHEMAS_CACHE_CONFIG = "schemas.cache.size";
-    private static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
-
-    private static final HashMap<Schema.Type, JsonToCopycatTypeConverter> 
TO_COPYCAT_CONVERTERS = new HashMap<>();
-
-    private static Object checkOptionalAndDefault(Schema schema) {
-        if (schema.defaultValue() != null)
-            return schema.defaultValue();
-        if (schema.isOptional())
-            return null;
-        throw new DataException("Invalid null value for required field");
-    }
-
-    static {
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.BOOLEAN, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-                return value.booleanValue();
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.INT8, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-                return (byte) value.intValue();
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.INT16, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-                return (short) value.intValue();
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.INT32, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-                return value.intValue();
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.INT64, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-                return value.longValue();
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.FLOAT32, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-                return value.floatValue();
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.FLOAT64, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-                return value.doubleValue();
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.BYTES, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                try {
-                    if (value.isNull()) return checkOptionalAndDefault(schema);
-                    return value.binaryValue();
-                } catch (IOException e) {
-                    throw new DataException("Invalid bytes field", e);
-                }
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.STRING, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-                return value.textValue();
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.ARRAY, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-
-                Schema elemSchema = schema == null ? null : 
schema.valueSchema();
-                ArrayList<Object> result = new ArrayList<>();
-                for (JsonNode elem : value) {
-                    result.add(convertToCopycat(elemSchema, elem));
-                }
-                return result;
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.MAP, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-
-                Schema keySchema = schema == null ? null : schema.keySchema();
-                Schema valueSchema = schema == null ? null : 
schema.valueSchema();
-
-                // If the map uses strings for keys, it should be encoded in 
the natural JSON format. If it uses other
-                // primitive types or a complex type as a key, it will be 
encoded as a list of pairs. If we don't have a
-                // schema, we default to encoding in a Map.
-                Map<Object, Object> result = new HashMap<>();
-                if (schema == null || keySchema.type() == Schema.Type.STRING) {
-                    if (!value.isObject())
-                        throw new DataException("Map's with string fields 
should be encoded as JSON objects, but found " + value.getNodeType());
-                    Iterator<Map.Entry<String, JsonNode>> fieldIt = 
value.fields();
-                    while (fieldIt.hasNext()) {
-                        Map.Entry<String, JsonNode> entry = fieldIt.next();
-                        result.put(entry.getKey(), 
convertToCopycat(valueSchema, entry.getValue()));
-                    }
-                } else {
-                    if (!value.isArray())
-                        throw new DataException("Map's with non-string fields 
should be encoded as JSON array of tuples, but found " + value.getNodeType());
-                    for (JsonNode entry : value) {
-                        if (!entry.isArray())
-                            throw new DataException("Found invalid map entry 
instead of array tuple: " + entry.getNodeType());
-                        if (entry.size() != 2)
-                            throw new DataException("Found invalid map entry, 
expected length 2 but found :" + entry.size());
-                        result.put(convertToCopycat(keySchema, entry.get(0)),
-                                convertToCopycat(valueSchema, entry.get(1)));
-                    }
-                }
-                return result;
-            }
-        });
-        TO_COPYCAT_CONVERTERS.put(Schema.Type.STRUCT, new 
JsonToCopycatTypeConverter() {
-            @Override
-            public Object convert(Schema schema, JsonNode value) {
-                if (value.isNull()) return checkOptionalAndDefault(schema);
-
-                if (!value.isObject())
-                    throw new DataException("Structs should be encoded as JSON 
objects, but found " + value.getNodeType());
-
-                // We only have ISchema here but need Schema, so we need to 
materialize the actual schema. Using ISchema
-                // avoids having to materialize the schema for non-Struct 
types but it cannot be avoided for Structs since
-                // they require a schema to be provided at construction. 
However, the schema is only a SchemaBuilder during
-                // translation of schemas to JSON; during the more common 
translation of data to JSON, the call to schema.schema()
-                // just returns the schema Object and has no overhead.
-                Struct result = new Struct(schema.schema());
-                for (Field field : schema.fields())
-                    result.put(field, convertToCopycat(field.schema(), 
value.get(field.name())));
-
-                return result;
-            }
-        });
-    }
-
-    // Convert values in Copycat form into their logical types. These logical 
converters are discovered by logical type
-    // names specified in the field
-    private static final HashMap<String, LogicalTypeConverter> 
TO_COPYCAT_LOGICAL_CONVERTERS = new HashMap<>();
-    static {
-        TO_COPYCAT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new 
LogicalTypeConverter() {
-            @Override
-            public Object convert(Schema schema, Object value) {
-                if (!(value instanceof byte[]))
-                    throw new DataException("Invalid type for Decimal, 
underlying representation should be bytes but was " + value.getClass());
-                return Decimal.toLogical(schema, (byte[]) value);
-            }
-        });
-
-        TO_COPYCAT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new 
LogicalTypeConverter() {
-            @Override
-            public Object convert(Schema schema, Object value) {
-                if (!(value instanceof Integer))
-                    throw new DataException("Invalid type for Date, underlying 
representation should be int32 but was " + value.getClass());
-                return Date.toLogical(schema, (int) value);
-            }
-        });
-
-        TO_COPYCAT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new 
LogicalTypeConverter() {
-            @Override
-            public Object convert(Schema schema, Object value) {
-                if (!(value instanceof Integer))
-                    throw new DataException("Invalid type for Time, underlying 
representation should be int32 but was " + value.getClass());
-                return Time.toLogical(schema, (int) value);
-            }
-        });
-
-        TO_COPYCAT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new 
LogicalTypeConverter() {
-            @Override
-            public Object convert(Schema schema, Object value) {
-                if (!(value instanceof Long))
-                    throw new DataException("Invalid type for Timestamp, 
underlying representation should be int64 but was " + value.getClass());
-                return Timestamp.toLogical(schema, (long) value);
-            }
-        });
-    }
-
-    private static final HashMap<String, LogicalTypeConverter> 
TO_JSON_LOGICAL_CONVERTERS = new HashMap<>();
-    static {
-        TO_JSON_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new 
LogicalTypeConverter() {
-            @Override
-            public Object convert(Schema schema, Object value) {
-                if (!(value instanceof BigDecimal))
-                    throw new DataException("Invalid type for Decimal, 
expected BigDecimal but was " + value.getClass());
-                return Decimal.fromLogical(schema, (BigDecimal) value);
-            }
-        });
-
-        TO_JSON_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new 
LogicalTypeConverter() {
-            @Override
-            public Object convert(Schema schema, Object value) {
-                if (!(value instanceof java.util.Date))
-                    throw new DataException("Invalid type for Date, expected 
Date but was " + value.getClass());
-                return Date.fromLogical(schema, (java.util.Date) value);
-            }
-        });
-
-        TO_JSON_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new 
LogicalTypeConverter() {
-            @Override
-            public Object convert(Schema schema, Object value) {
-                if (!(value instanceof java.util.Date))
-                    throw new DataException("Invalid type for Time, expected 
Date but was " + value.getClass());
-                return Time.fromLogical(schema, (java.util.Date) value);
-            }
-        });
-
-        TO_JSON_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new 
LogicalTypeConverter() {
-            @Override
-            public Object convert(Schema schema, Object value) {
-                if (!(value instanceof java.util.Date))
-                    throw new DataException("Invalid type for Timestamp, 
expected Date but was " + value.getClass());
-                return Timestamp.fromLogical(schema, (java.util.Date) value);
-            }
-        });
-    }
-
-
-    private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
-    private int cacheSize = SCHEMAS_CACHE_SIZE_DEFAULT;
-    private Cache<Schema, ObjectNode> fromCopycatSchemaCache;
-    private Cache<JsonNode, Schema> toCopycatSchemaCache;
-
-    private final JsonSerializer serializer = new JsonSerializer();
-    private final JsonDeserializer deserializer = new JsonDeserializer();
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        Object enableConfigsVal = configs.get(SCHEMAS_ENABLE_CONFIG);
-        if (enableConfigsVal != null)
-            enableSchemas = enableConfigsVal.toString().equals("true");
-
-        serializer.configure(configs, isKey);
-        deserializer.configure(configs, isKey);
-
-        Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_DEFAULT);
-        if (cacheSizeVal != null)
-            cacheSize = (int) cacheSizeVal;
-        fromCopycatSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, 
ObjectNode>(cacheSize));
-        toCopycatSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, 
Schema>(cacheSize));
-    }
-
-    @Override
-    public byte[] fromCopycatData(String topic, Schema schema, Object value) {
-        JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, 
value) : convertToJsonWithoutEnvelope(schema, value);
-        try {
-            return serializer.serialize(topic, jsonValue);
-        } catch (SerializationException e) {
-            throw new DataException("Converting Copycat data to byte[] failed 
due to serialization error: ", e);
-        }
-    }
-
-    @Override
-    public SchemaAndValue toCopycatData(String topic, byte[] value) {
-        JsonNode jsonValue;
-        try {
-            jsonValue = deserializer.deserialize(topic, value);
-        } catch (SerializationException e) {
-            throw new DataException("Converting byte[] to Copycat data failed 
due to serialization error: ", e);
-        }
-
-        if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || 
jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
-            throw new DataException("JsonDeserializer with schemas.enable 
requires \"schema\" and \"payload\" fields and may not contain additional 
fields");
-
-        // The deserialized data should either be an envelope object 
containing the schema and the payload or the schema
-        // was stripped during serialization and we need to fill in an 
all-encompassing schema.
-        if (!enableSchemas) {
-            ObjectNode envelope = JsonNodeFactory.instance.objectNode();
-            envelope.set("schema", null);
-            envelope.set("payload", jsonValue);
-            jsonValue = envelope;
-        }
-
-        return jsonToCopycat(jsonValue);
-    }
-
-    private SchemaAndValue jsonToCopycat(JsonNode jsonValue) {
-        if (jsonValue == null)
-            return SchemaAndValue.NULL;
-
-        if (!jsonValue.isObject() || jsonValue.size() != 2 || 
!jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || 
!jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
-            throw new DataException("JSON value converted to Copycat must be 
in envelope containing schema");
-
-        Schema schema = 
asCopycatSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        return new SchemaAndValue(schema, convertToCopycat(schema, 
jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
-    }
-
-    private ObjectNode asJsonSchema(Schema schema) {
-        if (schema == null)
-            return null;
-
-        ObjectNode cached = fromCopycatSchemaCache.get(schema);
-        if (cached != null)
-            return cached;
-
-        final ObjectNode jsonSchema;
-        switch (schema.type()) {
-            case BOOLEAN:
-                jsonSchema = JsonSchema.BOOLEAN_SCHEMA.deepCopy();
-                break;
-            case BYTES:
-                jsonSchema = JsonSchema.BYTES_SCHEMA.deepCopy();
-                break;
-            case FLOAT64:
-                jsonSchema = JsonSchema.DOUBLE_SCHEMA.deepCopy();
-                break;
-            case FLOAT32:
-                jsonSchema = JsonSchema.FLOAT_SCHEMA.deepCopy();
-                break;
-            case INT8:
-                jsonSchema = JsonSchema.INT8_SCHEMA.deepCopy();
-                break;
-            case INT16:
-                jsonSchema = JsonSchema.INT16_SCHEMA.deepCopy();
-                break;
-            case INT32:
-                jsonSchema = JsonSchema.INT32_SCHEMA.deepCopy();
-                break;
-            case INT64:
-                jsonSchema = JsonSchema.INT64_SCHEMA.deepCopy();
-                break;
-            case STRING:
-                jsonSchema = JsonSchema.STRING_SCHEMA.deepCopy();
-                break;
-            case ARRAY:
-                jsonSchema = 
JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, 
JsonSchema.ARRAY_TYPE_NAME);
-                jsonSchema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, 
asJsonSchema(schema.valueSchema()));
-                break;
-            case MAP:
-                jsonSchema = 
JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, 
JsonSchema.MAP_TYPE_NAME);
-                jsonSchema.set(JsonSchema.MAP_KEY_FIELD_NAME, 
asJsonSchema(schema.keySchema()));
-                jsonSchema.set(JsonSchema.MAP_VALUE_FIELD_NAME, 
asJsonSchema(schema.valueSchema()));
-                break;
-            case STRUCT:
-                jsonSchema = 
JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, 
JsonSchema.STRUCT_TYPE_NAME);
-                ArrayNode fields = JsonNodeFactory.instance.arrayNode();
-                for (Field field : schema.fields()) {
-                    ObjectNode fieldJsonSchema = asJsonSchema(field.schema());
-                    
fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name());
-                    fields.add(fieldJsonSchema);
-                }
-                jsonSchema.set(JsonSchema.STRUCT_FIELDS_FIELD_NAME, fields);
-                break;
-            default:
-                throw new DataException("Couldn't translate unsupported schema 
type " + schema + ".");
-        }
-
-        jsonSchema.put(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME, 
schema.isOptional());
-        if (schema.name() != null)
-            jsonSchema.put(JsonSchema.SCHEMA_NAME_FIELD_NAME, schema.name());
-        if (schema.version() != null)
-            jsonSchema.put(JsonSchema.SCHEMA_VERSION_FIELD_NAME, 
schema.version());
-        if (schema.doc() != null)
-            jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.doc());
-        if (schema.parameters() != null) {
-            ObjectNode jsonSchemaParams = 
JsonNodeFactory.instance.objectNode();
-            for (Map.Entry<String, String> prop : 
schema.parameters().entrySet())
-                jsonSchemaParams.put(prop.getKey(), prop.getValue());
-            jsonSchema.put(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, 
jsonSchemaParams);
-        }
-        if (schema.defaultValue() != null)
-            jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, 
convertToJson(schema, schema.defaultValue()));
-
-        fromCopycatSchemaCache.put(schema, jsonSchema);
-        return jsonSchema;
-    }
-
-
-    private Schema asCopycatSchema(JsonNode jsonSchema) {
-        if (jsonSchema.isNull())
-            return null;
-
-        Schema cached = toCopycatSchemaCache.get(jsonSchema);
-        if (cached != null)
-            return cached;
-
-        JsonNode schemaTypeNode = 
jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME);
-        if (schemaTypeNode == null || !schemaTypeNode.isTextual())
-            throw new DataException("Schema must contain 'type' field");
-
-        final SchemaBuilder builder;
-        switch (schemaTypeNode.textValue()) {
-            case JsonSchema.BOOLEAN_TYPE_NAME:
-                builder = SchemaBuilder.bool();
-                break;
-            case JsonSchema.INT8_TYPE_NAME:
-                builder = SchemaBuilder.int8();
-                break;
-            case JsonSchema.INT16_TYPE_NAME:
-                builder = SchemaBuilder.int16();
-                break;
-            case JsonSchema.INT32_TYPE_NAME:
-                builder = SchemaBuilder.int32();
-                break;
-            case JsonSchema.INT64_TYPE_NAME:
-                builder = SchemaBuilder.int64();
-                break;
-            case JsonSchema.FLOAT_TYPE_NAME:
-                builder = SchemaBuilder.float32();
-                break;
-            case JsonSchema.DOUBLE_TYPE_NAME:
-                builder = SchemaBuilder.float64();
-                break;
-            case JsonSchema.BYTES_TYPE_NAME:
-                builder = SchemaBuilder.bytes();
-                break;
-            case JsonSchema.STRING_TYPE_NAME:
-                builder = SchemaBuilder.string();
-                break;
-            case JsonSchema.ARRAY_TYPE_NAME:
-                JsonNode elemSchema = 
jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
-                if (elemSchema == null)
-                    throw new DataException("Array schema did not specify the 
element type");
-                builder = SchemaBuilder.array(asCopycatSchema(elemSchema));
-                break;
-            case JsonSchema.MAP_TYPE_NAME:
-                JsonNode keySchema = 
jsonSchema.get(JsonSchema.MAP_KEY_FIELD_NAME);
-                if (keySchema == null)
-                    throw new DataException("Map schema did not specify the 
key type");
-                JsonNode valueSchema = 
jsonSchema.get(JsonSchema.MAP_VALUE_FIELD_NAME);
-                if (valueSchema == null)
-                    throw new DataException("Map schema did not specify the 
value type");
-                builder = SchemaBuilder.map(asCopycatSchema(keySchema), 
asCopycatSchema(valueSchema));
-                break;
-            case JsonSchema.STRUCT_TYPE_NAME:
-                builder = SchemaBuilder.struct();
-                JsonNode fields = 
jsonSchema.get(JsonSchema.STRUCT_FIELDS_FIELD_NAME);
-                if (fields == null || !fields.isArray())
-                    throw new DataException("Struct schema's \"fields\" 
argument is not an array.");
-                for (JsonNode field : fields) {
-                    JsonNode jsonFieldName = 
field.get(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME);
-                    if (jsonFieldName == null || !jsonFieldName.isTextual())
-                        throw new DataException("Struct schema's field name 
not specified properly");
-                    builder.field(jsonFieldName.asText(), 
asCopycatSchema(field));
-                }
-                break;
-            default:
-                throw new DataException("Unknown schema type: " + 
schemaTypeNode.textValue());
-        }
-
-
-        JsonNode schemaOptionalNode = 
jsonSchema.get(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME);
-        if (schemaOptionalNode != null && schemaOptionalNode.isBoolean() && 
schemaOptionalNode.booleanValue())
-            builder.optional();
-        else
-            builder.required();
-
-        JsonNode schemaNameNode = 
jsonSchema.get(JsonSchema.SCHEMA_NAME_FIELD_NAME);
-        if (schemaNameNode != null && schemaNameNode.isTextual())
-            builder.name(schemaNameNode.textValue());
-
-        JsonNode schemaVersionNode = 
jsonSchema.get(JsonSchema.SCHEMA_VERSION_FIELD_NAME);
-        if (schemaVersionNode != null && schemaVersionNode.isIntegralNumber()) 
{
-            builder.version(schemaVersionNode.intValue());
-        }
-
-        JsonNode schemaDocNode = 
jsonSchema.get(JsonSchema.SCHEMA_DOC_FIELD_NAME);
-        if (schemaDocNode != null && schemaDocNode.isTextual())
-            builder.doc(schemaDocNode.textValue());
-
-        JsonNode schemaParamsNode = 
jsonSchema.get(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME);
-        if (schemaParamsNode != null && schemaParamsNode.isObject()) {
-            Iterator<Map.Entry<String, JsonNode>> paramsIt = 
schemaParamsNode.fields();
-            while (paramsIt.hasNext()) {
-                Map.Entry<String, JsonNode> entry = paramsIt.next();
-                JsonNode paramValue = entry.getValue();
-                if (!paramValue.isTextual())
-                    throw new DataException("Schema parameters must have 
string values.");
-                builder.parameter(entry.getKey(), paramValue.textValue());
-            }
-        }
-
-        JsonNode schemaDefaultNode = 
jsonSchema.get(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME);
-        if (schemaDefaultNode != null)
-            builder.defaultValue(convertToCopycat(builder, schemaDefaultNode));
-
-        Schema result = builder.build();
-        toCopycatSchemaCache.put(jsonSchema, result);
-        return result;
-    }
-
-
-    /**
-     * Convert this object, in org.apache.kafka.copycat.data format, into a 
JSON object with an envelope object
-     * containing schema and payload fields.
-     * @param schema the schema for the data
-     * @param value the value
-     * @return JsonNode-encoded version
-     */
-    private JsonNode convertToJsonWithEnvelope(Schema schema, Object value) {
-        return new JsonSchema.Envelope(asJsonSchema(schema), 
convertToJson(schema, value)).toJsonNode();
-    }
-
-    private JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) 
{
-        return convertToJson(schema, value);
-    }
-
-    /**
-     * Convert this object, in the org.apache.kafka.copycat.data format, into 
a JSON object, returning both the schema
-     * and the converted object.
-     */
-    private static JsonNode convertToJson(Schema schema, Object logicalValue) {
-        if (logicalValue == null) {
-            if (schema == null) // Any schema is valid and we don't have a 
default, so treat this as an optional schema
-                return null;
-            if (schema.defaultValue() != null)
-                return convertToJson(schema, schema.defaultValue());
-            if (schema.isOptional())
-                return JsonNodeFactory.instance.nullNode();
-            throw new DataException("Conversion error: null value for field 
that is required and has no default value");
-        }
-
-        Object value = logicalValue;
-        if (schema != null && schema.name() != null) {
-            LogicalTypeConverter logicalConverter = 
TO_JSON_LOGICAL_CONVERTERS.get(schema.name());
-            if (logicalConverter != null)
-                value = logicalConverter.convert(schema, logicalValue);
-        }
-
-        try {
-            final Schema.Type schemaType;
-            if (schema == null) {
-                schemaType = CopycatSchema.schemaType(value.getClass());
-                if (schemaType == null)
-                    throw new DataException("Java class " + value.getClass() + 
" does not have corresponding schema type.");
-            } else {
-                schemaType = schema.type();
-            }
-            switch (schemaType) {
-                case INT8:
-                    return JsonNodeFactory.instance.numberNode((Byte) value);
-                case INT16:
-                    return JsonNodeFactory.instance.numberNode((Short) value);
-                case INT32:
-                    return JsonNodeFactory.instance.numberNode((Integer) 
value);
-                case INT64:
-                    return JsonNodeFactory.instance.numberNode((Long) value);
-                case FLOAT32:
-                    return JsonNodeFactory.instance.numberNode((Float) value);
-                case FLOAT64:
-                    return JsonNodeFactory.instance.numberNode((Double) value);
-                case BOOLEAN:
-                    return JsonNodeFactory.instance.booleanNode((Boolean) 
value);
-                case STRING:
-                    CharSequence charSeq = (CharSequence) value;
-                    return 
JsonNodeFactory.instance.textNode(charSeq.toString());
-                case BYTES:
-                    if (value instanceof byte[])
-                        return JsonNodeFactory.instance.binaryNode((byte[]) 
value);
-                    else if (value instanceof ByteBuffer)
-                        return 
JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array());
-                    else
-                        throw new DataException("Invalid type for bytes type: 
" + value.getClass());
-                case ARRAY: {
-                    Collection collection = (Collection) value;
-                    ArrayNode list = JsonNodeFactory.instance.arrayNode();
-                    for (Object elem : collection) {
-                        Schema valueSchema = schema == null ? null : 
schema.valueSchema();
-                        JsonNode fieldValue = convertToJson(valueSchema, elem);
-                        list.add(fieldValue);
-                    }
-                    return list;
-                }
-                case MAP: {
-                    Map<?, ?> map = (Map<?, ?>) value;
-                    // If true, using string keys and JSON object; if false, 
using non-string keys and Array-encoding
-                    boolean objectMode;
-                    if (schema == null) {
-                        objectMode = true;
-                        for (Map.Entry<?, ?> entry : map.entrySet()) {
-                            if (!(entry.getKey() instanceof String)) {
-                                objectMode = false;
-                                break;
-                            }
-                        }
-                    } else {
-                        objectMode = schema.keySchema().type() == 
Schema.Type.STRING;
-                    }
-                    ObjectNode obj = null;
-                    ArrayNode list = null;
-                    if (objectMode)
-                        obj = JsonNodeFactory.instance.objectNode();
-                    else
-                        list = JsonNodeFactory.instance.arrayNode();
-                    for (Map.Entry<?, ?> entry : map.entrySet()) {
-                        Schema keySchema = schema == null ? null : 
schema.keySchema();
-                        Schema valueSchema = schema == null ? null : 
schema.valueSchema();
-                        JsonNode mapKey = convertToJson(keySchema, 
entry.getKey());
-                        JsonNode mapValue = convertToJson(valueSchema, 
entry.getValue());
-
-                        if (objectMode)
-                            obj.set(mapKey.asText(), mapValue);
-                        else
-                            
list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue));
-                    }
-                    return objectMode ? obj : list;
-                }
-                case STRUCT: {
-                    Struct struct = (Struct) value;
-                    if (struct.schema() != schema)
-                        throw new DataException("Mismatching schema.");
-                    ObjectNode obj = JsonNodeFactory.instance.objectNode();
-                    for (Field field : schema.fields()) {
-                        obj.set(field.name(), convertToJson(field.schema(), 
struct.get(field)));
-                    }
-                    return obj;
-                }
-            }
-
-            throw new DataException("Couldn't convert " + value + " to JSON.");
-        } catch (ClassCastException e) {
-            throw new DataException("Invalid type for " + schema.type() + ": " 
+ value.getClass());
-        }
-    }
-
-
-    private static Object convertToCopycat(Schema schema, JsonNode jsonValue) {
-        JsonToCopycatTypeConverter typeConverter;
-        final Schema.Type schemaType;
-        if (schema != null) {
-            schemaType = schema.type();
-        } else {
-            switch (jsonValue.getNodeType()) {
-                case NULL:
-                    // Special case. With no schema
-                    return null;
-                case BOOLEAN:
-                    schemaType = Schema.Type.BOOLEAN;
-                    break;
-                case NUMBER:
-                    if (jsonValue.isIntegralNumber())
-                        schemaType = Schema.Type.INT64;
-                    else
-                        schemaType = Schema.Type.FLOAT64;
-                    break;
-                case ARRAY:
-                    schemaType = Schema.Type.ARRAY;
-                    break;
-                case OBJECT:
-                    schemaType = Schema.Type.MAP;
-                    break;
-                case STRING:
-                    schemaType = Schema.Type.STRING;
-                    break;
-
-                case BINARY:
-                case MISSING:
-                case POJO:
-                default:
-                    schemaType = null;
-                    break;
-            }
-        }
-        typeConverter = TO_COPYCAT_CONVERTERS.get(schemaType);
-        if (typeConverter == null)
-            throw new DataException("Unknown schema type: " + schema.type());
-
-        Object converted = typeConverter.convert(schema, jsonValue);
-        if (schema != null && schema.name() != null) {
-            LogicalTypeConverter logicalConverter = 
TO_COPYCAT_LOGICAL_CONVERTERS.get(schema.name());
-            if (logicalConverter != null)
-                converted = logicalConverter.convert(schema, converted);
-        }
-        return converted;
-    }
-
-
-    private interface JsonToCopycatTypeConverter {
-        Object convert(Schema schema, JsonNode value);
-    }
-
-    private interface LogicalTypeConverter {
-        Object convert(Schema schema, Object value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
----------------------------------------------------------------------
diff --git 
a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
 
b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
deleted file mode 100644
index 1661754..0000000
--- 
a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.json;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Deserializer;
-
-import java.util.Map;
-
-/**
- * JSON deserializer for Jackson's JsonNode tree model. Using the tree model 
allows it to work with arbitrarily
- * structured data without having associated Java classes. This deserializer 
also supports Copycat schemas.
- */
-public class JsonDeserializer implements Deserializer<JsonNode> {
-    private ObjectMapper objectMapper = new ObjectMapper();
-
-    /**
-     * Default constructor needed by Kafka
-     */
-    public JsonDeserializer() {
-    }
-
-    @Override
-    public void configure(Map<String, ?> props, boolean isKey) {
-    }
-
-    @Override
-    public JsonNode deserialize(String topic, byte[] bytes) {
-        if (bytes == null)
-            return null;
-
-        JsonNode data;
-        try {
-            data = objectMapper.readTree(bytes);
-        } catch (Exception e) {
-            throw new SerializationException(e);
-        }
-
-        return data;
-    }
-
-    @Override
-    public void close() {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
----------------------------------------------------------------------
diff --git 
a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java 
b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
deleted file mode 100644
index 78712f3..0000000
--- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.json;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
-public class JsonSchema {
-
-    static final String ENVELOPE_SCHEMA_FIELD_NAME = "schema";
-    static final String ENVELOPE_PAYLOAD_FIELD_NAME = "payload";
-    static final String SCHEMA_TYPE_FIELD_NAME = "type";
-    static final String SCHEMA_OPTIONAL_FIELD_NAME = "optional";
-    static final String SCHEMA_NAME_FIELD_NAME = "name";
-    static final String SCHEMA_VERSION_FIELD_NAME = "version";
-    static final String SCHEMA_DOC_FIELD_NAME = "doc";
-    static final String SCHEMA_PARAMETERS_FIELD_NAME = "parameters";
-    static final String SCHEMA_DEFAULT_FIELD_NAME = "default";
-    static final String ARRAY_ITEMS_FIELD_NAME = "items";
-    static final String MAP_KEY_FIELD_NAME = "keys";
-    static final String MAP_VALUE_FIELD_NAME = "values";
-    static final String STRUCT_FIELDS_FIELD_NAME = "fields";
-    static final String STRUCT_FIELD_NAME_FIELD_NAME = "field";
-    static final String BOOLEAN_TYPE_NAME = "boolean";
-    static final ObjectNode BOOLEAN_SCHEMA = 
JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, 
BOOLEAN_TYPE_NAME);
-    static final String INT8_TYPE_NAME = "int8";
-    static final ObjectNode INT8_SCHEMA = 
JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, 
INT8_TYPE_NAME);
-    static final String INT16_TYPE_NAME = "int16";
-    static final ObjectNode INT16_SCHEMA = 
JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, 
INT16_TYPE_NAME);
-    static final String INT32_TYPE_NAME = "int32";
-    static final ObjectNode INT32_SCHEMA = 
JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, 
INT32_TYPE_NAME);
-    static final String INT64_TYPE_NAME = "int64";
-    static final ObjectNode INT64_SCHEMA = 
JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, 
INT64_TYPE_NAME);
-    static final String FLOAT_TYPE_NAME = "float";
-    static final ObjectNode FLOAT_SCHEMA = 
JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, 
FLOAT_TYPE_NAME);
-    static final String DOUBLE_TYPE_NAME = "double";
-    static final ObjectNode DOUBLE_SCHEMA = 
JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, 
DOUBLE_TYPE_NAME);
-    static final String BYTES_TYPE_NAME = "bytes";
-    static final ObjectNode BYTES_SCHEMA = 
JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, 
BYTES_TYPE_NAME);
-    static final String STRING_TYPE_NAME = "string";
-    static final ObjectNode STRING_SCHEMA = 
JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, 
STRING_TYPE_NAME);
-    static final String ARRAY_TYPE_NAME = "array";
-    static final String MAP_TYPE_NAME = "map";
-    static final String STRUCT_TYPE_NAME = "struct";
-
-    public static ObjectNode envelope(JsonNode schema, JsonNode payload) {
-        ObjectNode result = JsonNodeFactory.instance.objectNode();
-        result.set(ENVELOPE_SCHEMA_FIELD_NAME, schema);
-        result.set(ENVELOPE_PAYLOAD_FIELD_NAME, payload);
-        return result;
-    }
-
-    static class Envelope {
-        public JsonNode schema;
-        public JsonNode payload;
-
-        public Envelope(JsonNode schema, JsonNode payload) {
-            this.schema = schema;
-            this.payload = payload;
-        }
-
-        public ObjectNode toJsonNode() {
-            return envelope(schema, payload);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
----------------------------------------------------------------------
diff --git 
a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java 
b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
deleted file mode 100644
index 129d14b..0000000
--- 
a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-package org.apache.kafka.copycat.json;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.kafka.common.errors.SerializationException;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.util.Map;
-
-/**
- * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree 
model allows handling arbitrarily
- * structured data without corresponding Java classes. This serializer also 
supports Copycat schemas.
- */
-public class JsonSerializer implements Serializer<JsonNode> {
-    private final ObjectMapper objectMapper = new ObjectMapper();
-
-    /**
-     * Default constructor needed by Kafka
-     */
-    public JsonSerializer() {
-
-    }
-
-    @Override
-    public void configure(Map<String, ?> config, boolean isKey) {
-    }
-
-    @Override
-    public byte[] serialize(String topic, JsonNode data) {
-        if (data == null)
-            return null;
-
-        try {
-            return objectMapper.writeValueAsBytes(data);
-        } catch (Exception e) {
-            throw new SerializationException("Error serializing JSON message", 
e);
-        }
-    }
-
-    @Override
-    public void close() {
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
----------------------------------------------------------------------
diff --git 
a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
 
b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
deleted file mode 100644
index 6b40046..0000000
--- 
a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java
+++ /dev/null
@@ -1,644 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.json;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import org.apache.kafka.copycat.data.Date;
-import org.apache.kafka.copycat.data.Decimal;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.kafka.common.cache.Cache;
-import org.apache.kafka.copycat.data.Schema;
-import org.apache.kafka.copycat.data.SchemaAndValue;
-import org.apache.kafka.copycat.data.SchemaBuilder;
-import org.apache.kafka.copycat.data.Struct;
-import org.apache.kafka.copycat.data.Time;
-import org.apache.kafka.copycat.data.Timestamp;
-import org.apache.kafka.copycat.errors.DataException;
-import org.junit.Before;
-import org.junit.Test;
-import org.powermock.reflect.Whitebox;
-
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.GregorianCalendar;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.TimeZone;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class JsonConverterTest {
-    private static final String TOPIC = "topic";
-
-    ObjectMapper objectMapper = new ObjectMapper();
-    JsonConverter converter = new JsonConverter();
-
-    @Before
-    public void setUp() {
-        converter.configure(Collections.EMPTY_MAP, false);
-    }
-
-    // Schema metadata
-
-    @Test
-    public void testCopycatSchemaMetadataTranslation() {
-        // this validates the non-type fields are translated and handled 
properly
-        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, 
\"payload\": true }".getBytes()));
-        assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", 
\"optional\": true }, \"payload\": null }".getBytes()));
-        assertEquals(new 
SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true),
-                converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": 
\"boolean\", \"default\": true }, \"payload\": null }".getBytes()));
-        assertEquals(new 
SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the 
documentation").parameter("foo", "bar").build(), true),
-                converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": 
\"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": 
\"the documentation\", \"parameters\": { \"foo\": \"bar\" }}, \"payload\": true 
}".getBytes()));
-    }
-
-    // Schema types
-
-    @Test
-    public void booleanToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, 
\"payload\": true }".getBytes()));
-        assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, 
\"payload\": false }".getBytes()));
-    }
-
-    @Test
-    public void byteToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int8\" }, 
\"payload\": 12 }".getBytes()));
-    }
-
-    @Test
-    public void shortToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int16\" }, 
\"payload\": 12 }".getBytes()));
-    }
-
-    @Test
-    public void intToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int32\" }, 
\"payload\": 12 }".getBytes()));
-    }
-
-    @Test
-    public void longToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, 
\"payload\": 12 }".getBytes()));
-        assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, 
\"payload\": 4398046511104 }".getBytes()));
-    }
-
-    @Test
-    public void floatToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"float\" }, 
\"payload\": 12.34 }".getBytes()));
-    }
-
-    @Test
-    public void doubleToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"double\" }, 
\"payload\": 12.34 }".getBytes()));
-    }
-
-
-    @Test
-    public void bytesToCopycat() throws UnsupportedEncodingException {
-        ByteBuffer reference = 
ByteBuffer.wrap("test-string".getBytes("UTF-8"));
-        String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": 
\"dGVzdC1zdHJpbmc=\" }";
-        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, 
msg.getBytes());
-        ByteBuffer converted = ByteBuffer.wrap((byte[]) 
schemaAndValue.value());
-        assertEquals(reference, converted);
-    }
-
-    @Test
-    public void stringToCopycat() {
-        assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), 
converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, 
\"payload\": \"foo-bar-baz\" }".getBytes()));
-    }
-
-    @Test
-    public void arrayToCopycat() {
-        byte[] arrayJson = "{ \"schema\": { \"type\": \"array\", \"items\": { 
\"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }".getBytes();
-        assertEquals(new 
SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), 
Arrays.asList(1, 2, 3)), converter.toCopycatData(TOPIC, arrayJson));
-    }
-
-    @Test
-    public void mapToCopycatStringKeys() {
-        byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { 
\"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { 
\"key1\": 12, \"key2\": 15} }".getBytes();
-        Map<String, Integer> expected = new HashMap<>();
-        expected.put("key1", 12);
-        expected.put("key2", 15);
-        assertEquals(new 
SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, 
mapJson));
-    }
-
-    @Test
-    public void mapToCopycatNonStringKeys() {
-        byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { 
\"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ 
[1, 12], [2, 15] ] }".getBytes();
-        Map<Integer, Integer> expected = new HashMap<>();
-        expected.put(1, 12);
-        expected.put(2, 15);
-        assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, 
mapJson));
-    }
-
-    @Test
-    public void structToCopycat() {
-        byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": 
[{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", 
\"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": 
\"string\" } }".getBytes();
-        Schema expectedSchema = SchemaBuilder.struct().field("field1", 
Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
-        Struct expected = new Struct(expectedSchema).put("field1", 
true).put("field2", "string");
-        SchemaAndValue converted = converter.toCopycatData(TOPIC, structJson);
-        assertEquals(new SchemaAndValue(expectedSchema, expected), converted);
-    }
-
-    @Test(expected = DataException.class)
-    public void nullToCopycat() {
-        // When schemas are enabled, trying to decode a null should be an 
error -- we should *always* have the envelope
-        assertEquals(SchemaAndValue.NULL, converter.toCopycatData(TOPIC, 
null));
-    }
-
-    @Test
-    public void nullSchemaPrimitiveToCopycat() {
-        SchemaAndValue converted = converter.toCopycatData(TOPIC, "{ 
\"schema\": null, \"payload\": null }".getBytes());
-        assertEquals(SchemaAndValue.NULL, converted);
-
-        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, 
\"payload\": true }".getBytes());
-        assertEquals(new SchemaAndValue(null, true), converted);
-
-        // Integers: Copycat has more data types, and JSON unfortunately mixes 
all number types. We try to preserve
-        // info as best we can, so we always use the largest integer and 
floating point numbers we can and have Jackson
-        // determine if it's an integer or not
-        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, 
\"payload\": 12 }".getBytes());
-        assertEquals(new SchemaAndValue(null, 12L), converted);
-
-        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, 
\"payload\": 12.24 }".getBytes());
-        assertEquals(new SchemaAndValue(null, 12.24), converted);
-
-        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, 
\"payload\": \"a string\" }".getBytes());
-        assertEquals(new SchemaAndValue(null, "a string"), converted);
-
-        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, 
\"payload\": [1, \"2\", 3] }".getBytes());
-        assertEquals(new SchemaAndValue(null, Arrays.asList(1L, "2", 3L)), 
converted);
-
-        converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, 
\"payload\": { \"field1\": 1, \"field2\": 2} }".getBytes());
-        Map<String, Long> obj = new HashMap<>();
-        obj.put("field1", 1L);
-        obj.put("field2", 2L);
-        assertEquals(new SchemaAndValue(null, obj), converted);
-    }
-
-    @Test
-    public void decimalToCopycat() {
-        Schema schema = Decimal.schema(2);
-        BigDecimal reference = new BigDecimal(new BigInteger("156"), 2);
-        // Payload is base64 encoded byte[]{0, -100}, which is the two's 
complement encoding of 156.
-        String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": 
\"org.apache.kafka.copycat.data.Decimal\", \"version\": 1, \"parameters\": { 
\"scale\": \"2\" } }, \"payload\": \"AJw=\" }";
-        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, 
msg.getBytes());
-        BigDecimal converted = (BigDecimal) schemaAndValue.value();
-        assertEquals(schema, schemaAndValue.schema());
-        assertEquals(reference, converted);
-    }
-
-    @Test
-    public void dateToCopycat() {
-        Schema schema = Date.SCHEMA;
-        GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
-        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
-        calendar.add(Calendar.DATE, 10000);
-        java.util.Date reference = calendar.getTime();
-        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.copycat.data.Date\", \"version\": 1 }, \"payload\": 10000 }";
-        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, 
msg.getBytes());
-        java.util.Date converted = (java.util.Date) schemaAndValue.value();
-        assertEquals(schema, schemaAndValue.schema());
-        assertEquals(reference, converted);
-    }
-
-    @Test
-    public void timeToCopycat() {
-        Schema schema = Time.SCHEMA;
-        GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
-        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
-        calendar.add(Calendar.MILLISECOND, 14400000);
-        java.util.Date reference = calendar.getTime();
-        String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": 
\"org.apache.kafka.copycat.data.Time\", \"version\": 1 }, \"payload\": 14400000 
}";
-        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, 
msg.getBytes());
-        java.util.Date converted = (java.util.Date) schemaAndValue.value();
-        assertEquals(schema, schemaAndValue.schema());
-        assertEquals(reference, converted);
-    }
-
-    @Test
-    public void timestampToCopycat() {
-        Schema schema = Timestamp.SCHEMA;
-        GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
-        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
-        calendar.add(Calendar.MILLISECOND, 2000000000);
-        calendar.add(Calendar.MILLISECOND, 2000000000);
-        java.util.Date reference = calendar.getTime();
-        String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": 
\"org.apache.kafka.copycat.data.Timestamp\", \"version\": 1 }, \"payload\": 
4000000000 }";
-        SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, 
msg.getBytes());
-        java.util.Date converted = (java.util.Date) schemaAndValue.value();
-        assertEquals(schema, schemaAndValue.schema());
-        assertEquals(reference, converted);
-    }
-
-    // Schema metadata
-
-    @Test
-    public void testJsonSchemaMetadataTranslation() {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.BOOLEAN_SCHEMA, true));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(true, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
-
-        converted = parse(converter.fromCopycatData(TOPIC, 
Schema.OPTIONAL_BOOLEAN_SCHEMA, null));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": true }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        
assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNull());
-
-        converted = parse(converter.fromCopycatData(TOPIC, 
SchemaBuilder.bool().defaultValue(true).build(), true));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, 
\"default\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(true, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
-
-        converted = parse(converter.fromCopycatData(TOPIC, 
SchemaBuilder.bool().required().name("bool").version(3).doc("the 
documentation").parameter("foo", "bar").build(), true));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, 
\"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\", 
\"parameters\": { \"foo\": \"bar\" }}"),
-                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(true, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
-    }
-
-
-    @Test
-    public void testCacheSchemaToCopycatConversion() {
-        Cache<JsonNode, Schema> cache = Whitebox.getInternalState(converter, 
"toCopycatSchemaCache");
-        assertEquals(0, cache.size());
-
-        converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" 
}, \"payload\": true }".getBytes());
-        assertEquals(1, cache.size());
-
-        converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" 
}, \"payload\": true }".getBytes());
-        assertEquals(1, cache.size());
-
-        // Different schema should also get cached
-        converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", 
\"optional\": true }, \"payload\": true }".getBytes());
-        assertEquals(2, cache.size());
-
-        // Even equivalent, but different JSON encoding of schema, should get 
different cache entry
-        converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", 
\"optional\": false }, \"payload\": true }".getBytes());
-        assertEquals(3, cache.size());
-    }
-
-    // Schema types
-
-    @Test
-    public void booleanToJson() {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.BOOLEAN_SCHEMA, true));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(true, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
-    }
-
-    @Test
-    public void byteToJson() {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.INT8_SCHEMA, (byte) 12));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"int8\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(12, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
-    }
-
-    @Test
-    public void shortToJson() {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.INT16_SCHEMA, (short) 12));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"int16\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(12, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
-    }
-
-    @Test
-    public void intToJson() {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.INT32_SCHEMA, 12));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(12, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue());
-    }
-
-    @Test
-    public void longToJson() {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.INT64_SCHEMA, 4398046511104L));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"int64\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(4398046511104L, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue());
-    }
-
-    @Test
-    public void floatToJson() {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.FLOAT32_SCHEMA, 12.34f));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(12.34f, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001);
-    }
-
-    @Test
-    public void doubleToJson() {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.FLOAT64_SCHEMA, 12.34));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(12.34, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001);
-    }
-
-    @Test
-    public void bytesToJson() throws IOException {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.BYTES_SCHEMA, "test-string".getBytes()));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(ByteBuffer.wrap("test-string".getBytes()),
-                
ByteBuffer.wrap(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue()));
-    }
-
-    @Test
-    public void stringToJson() {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Schema.STRING_SCHEMA, "test-string"));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), 
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals("test-string", 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
-    }
-
-    @Test
-    public void arrayToJson() {
-        Schema int32Array = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
int32Array, Arrays.asList(1, 2, 3)));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": 
\"int32\", \"optional\": false }, \"optional\": false }"),
-                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add(2).add(3),
-                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
-    }
-
-    @Test
-    public void mapToJsonStringKeys() {
-        Schema stringIntMap = SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.INT32_SCHEMA).build();
-        Map<String, Integer> input = new HashMap<>();
-        input.put("key1", 12);
-        input.put("key2", 15);
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
stringIntMap, input));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : 
\"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\", 
\"optional\": false }, \"optional\": false }"),
-                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 
12).put("key2", 15),
-                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
-    }
-
-    @Test
-    public void mapToJsonNonStringKeys() {
-        Schema intIntMap = SchemaBuilder.map(Schema.INT32_SCHEMA, 
Schema.INT32_SCHEMA).build();
-        Map<Integer, Integer> input = new HashMap<>();
-        input.put(1, 12);
-        input.put(2, 15);
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, intIntMap, 
input));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : 
\"int32\", \"optional\": false }, \"values\": { \"type\" : \"int32\", 
\"optional\": false }, \"optional\": false }"),
-                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-
-        
assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
-        ArrayNode payload = (ArrayNode) 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
-        assertEquals(2, payload.size());
-        Set<JsonNode> payloadEntries = new HashSet<>();
-        for (JsonNode elem : payload)
-            payloadEntries.add(elem);
-        assertEquals(new 
HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add(1).add(12),
-                        JsonNodeFactory.instance.arrayNode().add(2).add(15))),
-                payloadEntries
-        );
-    }
-
-    @Test
-    public void structToJson() {
-        Schema schema = SchemaBuilder.struct().field("field1", 
Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build();
-        Struct input = new Struct(schema).put("field1", true).put("field2", 
"string");
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, schema, 
input));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, 
\"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": 
false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] 
}"),
-                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertEquals(JsonNodeFactory.instance.objectNode()
-                        .put("field1", true)
-                        .put("field2", "string"),
-                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
-    }
-
-
-    @Test
-    public void decimalToJson() throws IOException {
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2)));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false, 
\"name\": \"org.apache.kafka.copycat.data.Decimal\", \"version\": 1, 
\"parameters\": { \"scale\": \"2\" } }"),
-                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertArrayEquals(new byte[]{0, -100}, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue());
-    }
-
-    @Test
-    public void dateToJson() throws IOException {
-        GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
-        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
-        calendar.add(Calendar.DATE, 10000);
-        java.util.Date date = calendar.getTime();
-
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Date.SCHEMA, date));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, 
\"name\": \"org.apache.kafka.copycat.data.Date\", \"version\": 1 }"),
-                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        JsonNode payload = 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
-        assertTrue(payload.isInt());
-        assertEquals(10000, payload.intValue());
-    }
-
-    @Test
-    public void timeToJson() throws IOException {
-        GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
-        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
-        calendar.add(Calendar.MILLISECOND, 14400000);
-        java.util.Date date = calendar.getTime();
-
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Time.SCHEMA, date));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, 
\"name\": \"org.apache.kafka.copycat.data.Time\", \"version\": 1 }"),
-                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        JsonNode payload = 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
-        assertTrue(payload.isInt());
-        assertEquals(14400000, payload.longValue());
-    }
-
-    @Test
-    public void timestampToJson() throws IOException {
-        GregorianCalendar calendar = new GregorianCalendar(1970, 
Calendar.JANUARY, 1, 0, 0, 0);
-        calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
-        calendar.add(Calendar.MILLISECOND, 2000000000);
-        calendar.add(Calendar.MILLISECOND, 2000000000);
-        java.util.Date date = calendar.getTime();
-
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, 
Timestamp.SCHEMA, date));
-        validateEnvelope(converted);
-        assertEquals(parse("{ \"type\": \"int64\", \"optional\": false, 
\"name\": \"org.apache.kafka.copycat.data.Timestamp\", \"version\": 1 }"),
-                converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        JsonNode payload = 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
-        assertTrue(payload.isLong());
-        assertEquals(4000000000L, payload.longValue());
-    }
-
-
-    @Test
-    public void nullSchemaAndPrimitiveToJson() {
-        // This still needs to do conversion of data, null schema means 
"anything goes"
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, 
true));
-        validateEnvelopeNullSchema(converted);
-        
assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
-        assertEquals(true, 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue());
-    }
-
-    @Test
-    public void nullSchemaAndArrayToJson() {
-        // This still needs to do conversion of data, null schema means 
"anything goes". Make sure we mix and match
-        // types to verify conversion still works.
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, 
Arrays.asList(1, "string", true)));
-        validateEnvelopeNullSchema(converted);
-        
assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
-        
assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add("string").add(true),
-                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
-    }
-
-    @Test
-    public void nullSchemaAndMapToJson() {
-        // This still needs to do conversion of data, null schema means 
"anything goes". Make sure we mix and match
-        // types to verify conversion still works.
-        Map<String, Object> input = new HashMap<>();
-        input.put("key1", 12);
-        input.put("key2", "string");
-        input.put("key3", true);
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, 
input));
-        validateEnvelopeNullSchema(converted);
-        
assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
-        assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 
12).put("key2", "string").put("key3", true),
-                converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
-    }
-
-    @Test
-    public void nullSchemaAndMapNonStringKeysToJson() {
-        // This still needs to do conversion of data, null schema means 
"anything goes". Make sure we mix and match
-        // types to verify conversion still works.
-        Map<Object, Object> input = new HashMap<>();
-        input.put("string", 12);
-        input.put(52, "string");
-        input.put(false, true);
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, 
input));
-        validateEnvelopeNullSchema(converted);
-        
assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
-        
assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray());
-        ArrayNode payload = (ArrayNode) 
converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
-        assertEquals(3, payload.size());
-        Set<JsonNode> payloadEntries = new HashSet<>();
-        for (JsonNode elem : payload)
-            payloadEntries.add(elem);
-        assertEquals(new 
HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add("string").add(12),
-                        
JsonNodeFactory.instance.arrayNode().add(52).add("string"),
-                        
JsonNodeFactory.instance.arrayNode().add(false).add(true))),
-                payloadEntries
-        );
-    }
-
-
-    @Test(expected = DataException.class)
-    public void mismatchSchemaJson() {
-        // If we have mismatching schema info, we should properly convert to a 
DataException
-        converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, true);
-    }
-
-
-
-    @Test
-    public void noSchemaToCopycat() {
-        Map<String, Boolean> props = 
Collections.singletonMap("schemas.enable", false);
-        converter.configure(props, true);
-        assertEquals(new SchemaAndValue(null, true), 
converter.toCopycatData(TOPIC, "true".getBytes()));
-    }
-
-    @Test
-    public void noSchemaToJson() {
-        Map<String, Boolean> props = 
Collections.singletonMap("schemas.enable", false);
-        converter.configure(props, true);
-        JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, 
true));
-        assertTrue(converted.isBoolean());
-        assertEquals(true, converted.booleanValue());
-    }
-
-    @Test
-    public void testCacheSchemaToJsonConversion() {
-        Cache<Schema, ObjectNode> cache = Whitebox.getInternalState(converter, 
"fromCopycatSchemaCache");
-        assertEquals(0, cache.size());
-
-        // Repeated conversion of the same schema, even if the schema object 
is different should return the same Java
-        // object
-        converter.fromCopycatData(TOPIC, SchemaBuilder.bool().build(), true);
-        assertEquals(1, cache.size());
-
-        converter.fromCopycatData(TOPIC, SchemaBuilder.bool().build(), true);
-        assertEquals(1, cache.size());
-
-        // Validate that a similar, but different schema correctly returns a 
different schema.
-        converter.fromCopycatData(TOPIC, 
SchemaBuilder.bool().optional().build(), true);
-        assertEquals(2, cache.size());
-    }
-
-
-    private JsonNode parse(byte[] json) {
-        try {
-            return objectMapper.readTree(json);
-        } catch (IOException e) {
-            fail("IOException during JSON parse: " + e.getMessage());
-            throw new RuntimeException("failed");
-        }
-    }
-
-    private JsonNode parse(String json) {
-        try {
-            return objectMapper.readTree(json);
-        } catch (IOException e) {
-            fail("IOException during JSON parse: " + e.getMessage());
-            throw new RuntimeException("failed");
-        }
-    }
-
-    private void validateEnvelope(JsonNode env) {
-        assertNotNull(env);
-        assertTrue(env.isObject());
-        assertEquals(2, env.size());
-        assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject());
-        assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
-    }
-
-    private void validateEnvelopeNullSchema(JsonNode env) {
-        assertNotNull(env);
-        assertTrue(env.isObject());
-        assertEquals(2, env.size());
-        assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
-        assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull());
-        assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
deleted file mode 100644
index 8dfefaa..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.cli;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.runtime.Copycat;
-import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.runtime.distributed.DistributedConfig;
-import org.apache.kafka.copycat.runtime.distributed.DistributedHerder;
-import org.apache.kafka.copycat.runtime.rest.RestServer;
-import org.apache.kafka.copycat.storage.KafkaOffsetBackingStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * <p>
- * Command line utility that runs Copycat in distributed mode. In this mode, 
the process joints a group of other workers
- * and work is distributed among them. This is useful for running Copycat as a 
service, where connectors can be
- * submitted to the cluster to be automatically executed in a scalable, 
distributed fashion. This also allows you to
- * easily scale out horizontally, elastically adding or removing capacity 
simply by starting or stopping worker
- * instances.
- * </p>
- */
-@InterfaceStability.Unstable
-public class CopycatDistributed {
-    private static final Logger log = 
LoggerFactory.getLogger(CopycatDistributed.class);
-
-    public static void main(String[] args) throws Exception {
-        if (args.length < 1) {
-            log.info("Usage: CopycatDistributed worker.properties");
-            System.exit(1);
-        }
-
-        String workerPropsFile = args[0];
-        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.<String, String>emptyMap();
-
-        DistributedConfig config = new DistributedConfig(workerProps);
-        Worker worker = new Worker(config, new KafkaOffsetBackingStore());
-        RestServer rest = new RestServer(config);
-        DistributedHerder herder = new DistributedHerder(config, worker, 
rest.advertisedUrl());
-        final Copycat copycat = new Copycat(worker, herder, rest);
-        copycat.start();
-
-        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
-        copycat.awaitStop();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
----------------------------------------------------------------------
diff --git 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
 
b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
deleted file mode 100644
index 3869552..0000000
--- 
a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.cli;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.runtime.ConnectorConfig;
-import org.apache.kafka.copycat.runtime.Copycat;
-import org.apache.kafka.copycat.runtime.Herder;
-import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.runtime.rest.RestServer;
-import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
-import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
-import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
-import org.apache.kafka.copycat.storage.FileOffsetBackingStore;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.FutureCallback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * <p>
- * Command line utility that runs Copycat as a standalone process. In this 
mode, work is not
- * distributed. Instead, all the normal Copycat machinery works within a 
single process. This is
- * useful for ad hoc, small, or experimental jobs.
- * </p>
- * <p>
- * By default, no job configs or offset data is persistent. You can make jobs 
persistent and
- * fault tolerant by overriding the settings to use file storage for both.
- * </p>
- */
-@InterfaceStability.Unstable
-public class CopycatStandalone {
-    private static final Logger log = 
LoggerFactory.getLogger(CopycatStandalone.class);
-
-    public static void main(String[] args) throws Exception {
-
-        if (args.length < 2) {
-            log.info("Usage: CopycatStandalone worker.properties 
connector1.properties [connector2.properties ...]");
-            System.exit(1);
-        }
-
-        String workerPropsFile = args[0];
-        Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : 
Collections.<String, String>emptyMap();
-
-        StandaloneConfig config = new StandaloneConfig(workerProps);
-        Worker worker = new Worker(config, new FileOffsetBackingStore());
-        RestServer rest = new RestServer(config);
-        Herder herder = new StandaloneHerder(worker);
-        final Copycat copycat = new Copycat(worker, herder, rest);
-        copycat.start();
-
-        try {
-            for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, 
args.length)) {
-                Map<String, String> connectorProps = 
Utils.propsToStringMap(Utils.loadProps(connectorPropsFile));
-                FutureCallback<Herder.Created<ConnectorInfo>> cb = new 
FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
-                    @Override
-                    public void onCompletion(Throwable error, 
Herder.Created<ConnectorInfo> info) {
-                        if (error != null)
-                            log.error("Failed to create job for {}", 
connectorPropsFile);
-                        else
-                            log.info("Created connector {}", 
info.result().name());
-                    }
-                });
-                herder.putConnectorConfig(
-                        connectorProps.get(ConnectorConfig.NAME_CONFIG),
-                        connectorProps, false, cb);
-                cb.get();
-            }
-        } catch (Throwable t) {
-            log.error("Stopping after connector error", t);
-            copycat.stop();
-        }
-
-        // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
-        copycat.awaitStop();
-    }
-}

Reply via email to