http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
deleted file mode 100644
index 104abf1..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class CopycatSchema implements Schema {
-    /**
-     * Maps Schema.Types to a list of Java classes that can be used to 
represent them.
-     */
-    private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new 
HashMap<>();
-    /**
-     * Maps known logical types to a list of Java classes that can be used to 
represent them.
-     */
-    private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new 
HashMap<>();
-
-    /**
-     * Maps the Java classes to the corresponding Schema.Type.
-     */
-    private static final Map<Class<?>, Type> JAVA_CLASS_SCHEMA_TYPES = new 
HashMap<>();
-
-    static {
-        SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class));
-        SCHEMA_TYPE_CLASSES.put(Type.INT16, Arrays.asList((Class) 
Short.class));
-        SCHEMA_TYPE_CLASSES.put(Type.INT32, Arrays.asList((Class) 
Integer.class));
-        SCHEMA_TYPE_CLASSES.put(Type.INT64, Arrays.asList((Class) Long.class));
-        SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Arrays.asList((Class) 
Float.class));
-        SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Arrays.asList((Class) 
Double.class));
-        SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Arrays.asList((Class) 
Boolean.class));
-        SCHEMA_TYPE_CLASSES.put(Type.STRING, Arrays.asList((Class) 
String.class));
-        // Bytes are special and have 2 representations. byte[] causes 
problems because it doesn't handle equals() and
-        // hashCode() like we want objects to, so we support both byte[] and 
ByteBuffer. Using plain byte[] can cause
-        // those methods to fail, so ByteBuffers are recommended
-        SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList((Class) 
byte[].class, (Class) ByteBuffer.class));
-        SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Arrays.asList((Class) List.class));
-        SCHEMA_TYPE_CLASSES.put(Type.MAP, Arrays.asList((Class) Map.class));
-        SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Arrays.asList((Class) 
Struct.class));
-
-        for (Map.Entry<Type, List<Class>> schemaClasses : 
SCHEMA_TYPE_CLASSES.entrySet()) {
-            for (Class<?> schemaClass : schemaClasses.getValue())
-                JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, 
schemaClasses.getKey());
-        }
-
-        LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Arrays.asList((Class) 
BigDecimal.class));
-        LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Arrays.asList((Class) 
java.util.Date.class));
-        LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Arrays.asList((Class) 
java.util.Date.class));
-        LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Arrays.asList((Class) 
java.util.Date.class));
-        // We don't need to put these into JAVA_CLASS_SCHEMA_TYPES since 
that's only used to determine schemas for
-        // schemaless data and logical types will have ambiguous schemas (e.g. 
many of them use the same Java class) so
-        // they should not be used without schemas.
-    }
-
-    // The type of the field
-    private final Type type;
-    private final boolean optional;
-    private final Object defaultValue;
-
-    private final List<Field> fields;
-    private final Map<String, Field> fieldsByName;
-
-    private final Schema keySchema;
-    private final Schema valueSchema;
-
-    // Optional name and version provide a built-in way to indicate what type 
of data is included. Most
-    // useful for structs to indicate the semantics of the struct and map it 
to some existing underlying
-    // serializer-specific schema. However, can also be useful in specifying 
other logical types (e.g. a set is an array
-    // with additional constraints).
-    private final String name;
-    private final Integer version;
-    // Optional human readable documentation describing this schema.
-    private final String doc;
-    private final Map<String, String> parameters;
-
-    /**
-     * Construct a Schema. Most users should not construct schemas manually, 
preferring {@link SchemaBuilder} instead.
-     */
-    public CopycatSchema(Type type, boolean optional, Object defaultValue, 
String name, Integer version, String doc, Map<String, String> parameters, 
List<Field> fields, Schema keySchema, Schema valueSchema) {
-        this.type = type;
-        this.optional = optional;
-        this.defaultValue = defaultValue;
-        this.name = name;
-        this.version = version;
-        this.doc = doc;
-        this.parameters = parameters;
-
-        this.fields = fields;
-        if (this.fields != null && this.type == Type.STRUCT) {
-            this.fieldsByName = new HashMap<>();
-            for (Field field : fields)
-                fieldsByName.put(field.name(), field);
-        } else {
-            this.fieldsByName = null;
-        }
-
-        this.keySchema = keySchema;
-        this.valueSchema = valueSchema;
-    }
-
-    /**
-     * Construct a Schema for a primitive type, setting schema parameters, 
struct fields, and key and value schemas to null.
-     */
-    public CopycatSchema(Type type, boolean optional, Object defaultValue, 
String name, Integer version, String doc) {
-        this(type, optional, defaultValue, name, version, doc, null, null, 
null, null);
-    }
-
-    /**
-     * Construct a default schema for a primitive type. The schema is 
required, has no default value, name, version,
-     * or documentation.
-     */
-    public CopycatSchema(Type type) {
-        this(type, false, null, null, null, null);
-    }
-
-    @Override
-    public Type type() {
-        return type;
-    }
-
-    @Override
-    public boolean isOptional() {
-        return optional;
-    }
-
-    @Override
-    public Object defaultValue() {
-        return defaultValue;
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    @Override
-    public Integer version() {
-        return version;
-    }
-
-    @Override
-    public String doc() {
-        return doc;
-    }
-
-    @Override
-    public Map<String, String> parameters() {
-        return parameters;
-    }
-
-    @Override
-    public List<Field> fields() {
-        if (type != Type.STRUCT)
-            throw new DataException("Cannot list fields on non-struct type");
-        return fields;
-    }
-
-    public Field field(String fieldName) {
-        if (type != Type.STRUCT)
-            throw new DataException("Cannot look up fields on non-struct 
type");
-        return fieldsByName.get(fieldName);
-    }
-
-    @Override
-    public Schema keySchema() {
-        if (type != Type.MAP)
-            throw new DataException("Cannot look up key schema on non-map 
type");
-        return keySchema;
-    }
-
-    @Override
-    public Schema valueSchema() {
-        if (type != Type.MAP && type != Type.ARRAY)
-            throw new DataException("Cannot look up value schema on non-array 
and non-map type");
-        return valueSchema;
-    }
-
-
-
-    /**
-     * Validate that the value can be used with the schema, i.e. that its type 
matches the schema type and nullability
-     * requirements. Throws a DataException if the value is invalid.
-     * @param schema Schema to test
-     * @param value value to test
-     */
-    public static void validateValue(Schema schema, Object value) {
-        if (value == null) {
-            if (!schema.isOptional())
-                throw new DataException("Invalid value: null used for required 
field");
-            else
-                return;
-        }
-
-        List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
-
-        if (expectedClasses == null)
-                expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
-
-        if (expectedClasses == null)
-            throw new DataException("Invalid Java object for schema type " + 
schema.type() + ": " + value.getClass());
-
-        boolean foundMatch = false;
-        for (Class<?> expectedClass : expectedClasses) {
-            if (expectedClass.isInstance(value)) {
-                foundMatch = true;
-                break;
-            }
-        }
-        if (!foundMatch)
-            throw new DataException("Invalid Java object for schema type " + 
schema.type() + ": " + value.getClass());
-
-        switch (schema.type()) {
-            case STRUCT:
-                Struct struct = (Struct) value;
-                if (!struct.schema().equals(schema))
-                    throw new DataException("Struct schemas do not match.");
-                struct.validate();
-                break;
-            case ARRAY:
-                List<?> array = (List<?>) value;
-                for (Object entry : array)
-                    validateValue(schema.valueSchema(), entry);
-                break;
-            case MAP:
-                Map<?, ?> map = (Map<?, ?>) value;
-                for (Map.Entry<?, ?> entry : map.entrySet()) {
-                    validateValue(schema.keySchema(), entry.getKey());
-                    validateValue(schema.valueSchema(), entry.getValue());
-                }
-                break;
-        }
-    }
-
-    /**
-     * Validate that the value can be used for this schema, i.e. that its type 
matches the schema type and optional
-     * requirements. Throws a DataException if the value is invalid.
-     * @param value the value to validate
-     */
-    public void validateValue(Object value) {
-        validateValue(this, value);
-    }
-
-    @Override
-    public CopycatSchema schema() {
-        return this;
-    }
-
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        CopycatSchema schema = (CopycatSchema) o;
-        return Objects.equals(optional, schema.optional) &&
-                Objects.equals(type, schema.type) &&
-                Objects.equals(defaultValue, schema.defaultValue) &&
-                Objects.equals(fields, schema.fields) &&
-                Objects.equals(keySchema, schema.keySchema) &&
-                Objects.equals(valueSchema, schema.valueSchema) &&
-                Objects.equals(name, schema.name) &&
-                Objects.equals(version, schema.version) &&
-                Objects.equals(doc, schema.doc) &&
-                Objects.equals(parameters, schema.parameters);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(type, optional, defaultValue, fields, keySchema, 
valueSchema, name, version, doc, parameters);
-    }
-
-    @Override
-    public String toString() {
-        if (name != null)
-            return "Schema{" + name + ":" + type + "}";
-        else
-            return "Schema{" + type + "}";
-    }
-
-
-    /**
-     * Get the {@link Type} associated with the the given class.
-     *
-     * @param klass the Class to
-     * @return the corresponding type, nor null if there is no matching type
-     */
-    public static Type schemaType(Class<?> klass) {
-        synchronized (JAVA_CLASS_SCHEMA_TYPES) {
-            Type schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass);
-            if (schemaType != null)
-                return schemaType;
-
-            // Since the lookup only checks the class, we need to also try
-            for (Map.Entry<Class<?>, Type> entry : 
JAVA_CLASS_SCHEMA_TYPES.entrySet()) {
-                try {
-                    klass.asSubclass(entry.getKey());
-                    // Cache this for subsequent lookups
-                    JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue());
-                    return entry.getValue();
-                } catch (ClassCastException e) {
-                    // Expected, ignore
-                }
-            }
-        }
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java
deleted file mode 100644
index 4e14659..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.Calendar;
-import java.util.TimeZone;
-
-/**
- * <p>
- *     A date representing a calendar day with no time of day or timezone. The 
corresponding Java type is a java.util.Date
- *     with hours, minutes, seconds, milliseconds set to 0. The underlying 
representation is an integer representing the
- *     number of standardized days (based on a number of milliseconds with 24 
hours/day, 60 minutes/hour, 60 seconds/minute,
- *     1000 milliseconds/second with n) since Unix epoch.
- * </p>
- */
-public class Date {
-    public static final String LOGICAL_NAME = 
"org.apache.kafka.copycat.data.Date";
-
-    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
-
-    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
-    /**
-     * Returns a SchemaBuilder for a Date. By returning a SchemaBuilder you 
can override additional schema settings such
-     * as required/optional, default value, and documentation.
-     * @return a SchemaBuilder
-     */
-    public static SchemaBuilder builder() {
-        return SchemaBuilder.int32()
-                .name(LOGICAL_NAME)
-                .version(1);
-    }
-
-    public static final Schema SCHEMA = builder().schema();
-
-    /**
-     * Convert a value from its logical format (Date) to it's encoded format.
-     * @param value the logical value
-     * @return the encoded value
-     */
-    public static int fromLogical(Schema schema, java.util.Date value) {
-        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
-            throw new DataException("Requested conversion of Date object but 
the schema does not match.");
-        Calendar calendar = Calendar.getInstance(UTC);
-        calendar.setTime(value);
-        if (calendar.get(Calendar.HOUR_OF_DAY) != 0 || 
calendar.get(Calendar.MINUTE) != 0 ||
-                calendar.get(Calendar.SECOND) != 0 || 
calendar.get(Calendar.MILLISECOND) != 0) {
-            throw new DataException("Copycat Date type should not have any 
time fields set to non-zero values.");
-        }
-        long unixMillis = calendar.getTimeInMillis();
-        return (int) (unixMillis / MILLIS_PER_DAY);
-    }
-
-    public static java.util.Date toLogical(Schema schema, int value) {
-        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
-            throw new DataException("Requested conversion of Date object but 
the schema does not match.");
-        return new java.util.Date(value * MILLIS_PER_DAY);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java
deleted file mode 100644
index f23e13e..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-
-/**
- * <p>
- *     An arbitrary-precision signed decimal number. The value is unscaled * 
10 ^ -scale where:
- *     <ul>
- *         <li>unscaled is an integer </li>
- *         <li>scale is an integer representing how many digits the decimal 
point should be shifted on the unscaled value</li>
- *     </ul>
- * </p>
- * <p>
- *     Decimal does not provide a fixed schema because it is parameterized by 
the scale, which is fixed on the schema
- *     rather than being part of the value.
- * </p>
- * <p>
- *     The underlying representation of this type is bytes containing a two's 
complement integer
- * </p>
- */
-public class Decimal {
-    public static final String LOGICAL_NAME = 
"org.apache.kafka.copycat.data.Decimal";
-    public static final String SCALE_FIELD = "scale";
-
-    /**
-     * Returns a SchemaBuilder for a Decimal with the given scale factor. By 
returning a SchemaBuilder you can override
-     * additional schema settings such as required/optional, default value, 
and documentation.
-     * @param scale the scale factor to apply to unscaled values
-     * @return a SchemaBuilder
-     */
-    public static SchemaBuilder builder(int scale) {
-        return SchemaBuilder.bytes()
-                .name(LOGICAL_NAME)
-                .parameter(SCALE_FIELD, ((Integer) scale).toString())
-                .version(1);
-    }
-
-    public static Schema schema(int scale) {
-        return builder(scale).build();
-    }
-
-    /**
-     * Convert a value from its logical format (BigDecimal) to it's encoded 
format.
-     * @param value the logical value
-     * @return the encoded value
-     */
-    public static byte[] fromLogical(Schema schema, BigDecimal value) {
-        if (value.scale() != scale(schema))
-            throw new DataException("BigDecimal has mismatching scale value 
for given Decimal schema");
-        return value.unscaledValue().toByteArray();
-    }
-
-    public static BigDecimal toLogical(Schema schema, byte[] value) {
-        return new BigDecimal(new BigInteger(value), scale(schema));
-    }
-
-    private static int scale(Schema schema) {
-        String scaleString = schema.parameters().get(SCALE_FIELD);
-        if (scaleString == null)
-            throw new DataException("Invalid Decimal schema: scale parameter 
not found.");
-        try {
-            return Integer.parseInt(scaleString);
-        } catch (NumberFormatException e) {
-            throw new DataException("Invalid scale parameter found in Decimal 
schema: ", e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
deleted file mode 100644
index c71cdb4..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import java.util.Objects;
-
-/**
- * <p>
- *     A field in a {@link Struct}, consisting of a field name, index, and 
{@link Schema} for the field value.
- * </p>
- */
-public class Field {
-    private final String name;
-    private final int index;
-    private final Schema schema;
-
-    public Field(String name, int index, Schema schema) {
-        this.name = name;
-        this.index = index;
-        this.schema = schema;
-    }
-
-    /**
-     * Get the name of this field.
-     * @return the name of this field
-     */
-    public String name() {
-        return name;
-    }
-
-
-    /**
-     * Get the index of this field within the struct.
-     * @return the index of this field
-     */
-    public int index() {
-        return index;
-    }
-
-    /**
-     * Get the schema of this field
-     * @return the schema of values of this field
-     */
-    public Schema schema() {
-        return schema;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        Field field = (Field) o;
-        return Objects.equals(index, field.index) &&
-                Objects.equals(name, field.name) &&
-                Objects.equals(schema, field.schema);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(name, index, schema);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
deleted file mode 100644
index 3db01ae..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * <p>
- *     Definition of an abstract data type. Data types can be primitive types 
(integer types, floating point types,
- *     boolean, strings, and bytes) or complex types (typed arrays, maps with 
one key schema and value schema,
- *     and structs that have a fixed set of field names each with an 
associated value schema). Any type can be specified
- *     as optional, allowing it to be omitted (resulting in null values when 
it is missing) and can specify a default
- *     value.
- * </p>
- * <p>
- *     All schemas may have some associated metadata: a name, version, and 
documentation. These are all considered part
- *     of the schema itself and included when comparing schemas. Besides 
adding important metadata, these fields enable
- *     the specification of logical types that specify additional constraints 
and semantics (e.g. UNIX timestamps are
- *     just an int64, but the user needs the know about the additional 
semantics to interpret it properly).
- * </p>
- * <p>
- *     Schemas can be created directly, but in most cases using {@link 
SchemaBuilder} will be simpler.
- * </p>
- */
-public interface Schema {
-    /**
-     * The type of a schema. These only include the core types; logical types 
must be determined by checking the schema name.
-     */
-    enum Type {
-        INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, 
ARRAY, MAP, STRUCT;
-
-        private String name;
-
-        Type() {
-            this.name = this.name().toLowerCase();
-        }
-
-        public String getName() {
-            return name;
-        }
-
-        public boolean isPrimitive() {
-            switch (this) {
-                case INT8:
-                case INT16:
-                case INT32:
-                case INT64:
-                case FLOAT32:
-                case FLOAT64:
-                case BOOLEAN:
-                case STRING:
-                case BYTES:
-                    return true;
-            }
-            return false;
-        }
-    }
-
-
-    Schema INT8_SCHEMA = SchemaBuilder.int8().build();
-    Schema INT16_SCHEMA = SchemaBuilder.int16().build();
-    Schema INT32_SCHEMA = SchemaBuilder.int32().build();
-    Schema INT64_SCHEMA = SchemaBuilder.int64().build();
-    Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build();
-    Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build();
-    Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
-    Schema STRING_SCHEMA = SchemaBuilder.string().build();
-    Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
-
-    Schema OPTIONAL_INT8_SCHEMA = SchemaBuilder.int8().optional().build();
-    Schema OPTIONAL_INT16_SCHEMA = SchemaBuilder.int16().optional().build();
-    Schema OPTIONAL_INT32_SCHEMA = SchemaBuilder.int32().optional().build();
-    Schema OPTIONAL_INT64_SCHEMA = SchemaBuilder.int64().optional().build();
-    Schema OPTIONAL_FLOAT32_SCHEMA = 
SchemaBuilder.float32().optional().build();
-    Schema OPTIONAL_FLOAT64_SCHEMA = 
SchemaBuilder.float64().optional().build();
-    Schema OPTIONAL_BOOLEAN_SCHEMA = SchemaBuilder.bool().optional().build();
-    Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build();
-    Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build();
-
-    /**
-     * @return the type of this schema
-     */
-    Type type();
-
-    /**
-     * @return true if this field is optional, false otherwise
-     */
-    boolean isOptional();
-
-    /**
-     * @return the default value for this schema
-     */
-    Object defaultValue();
-
-    /**
-     * @return the name of this schema
-     */
-    String name();
-
-    /**
-     * Get the optional version of the schema. If a version is included, newer 
versions *must* be larger than older ones.
-     * @return the version of this schema
-     */
-    Integer version();
-
-    /**
-     * @return the documentation for this schema
-     */
-    String doc();
-
-    /**
-     * Get a map of schema parameters.
-     * @return Map containing parameters for this schema, or null if there are 
no parameters
-     */
-    Map<String, String> parameters();
-
-    /**
-     * Get the key schema for this map schema. Throws a DataException if this 
schema is not a map.
-     * @return the key schema
-     */
-    Schema keySchema();
-
-    /**
-     * Get the value schema for this map or array schema. Throws a 
DataException if this schema is not a map or array.
-     * @return the value schema
-     */
-    Schema valueSchema();
-
-    /**
-     * Get the list of fields for this Schema. Throws a DataException if this 
schema is not a struct.
-     * @return the list of fields for this Schema
-     */
-    List<Field> fields();
-
-    /**
-     * Get a field for this Schema by name. Throws a DataException if this 
schema is not a struct.
-     * @param fieldName the name of the field to look up
-     * @return the Field object for the specified field, or null if there is 
no field with the given name
-     */
-    Field field(String fieldName);
-
-    /**
-     * Return a concrete instance of the {@link Schema}
-     * @return the {@link Schema}
-     */
-    Schema schema();
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
deleted file mode 100644
index 368a8cf..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import java.util.Objects;
-
-public class SchemaAndValue {
-    private final Schema schema;
-    private final Object value;
-
-    public static final SchemaAndValue NULL = new SchemaAndValue(null, null);
-
-    public SchemaAndValue(Schema schema, Object value) {
-        this.value = value;
-        this.schema = schema;
-    }
-
-    public Schema schema() {
-        return schema;
-    }
-
-    public Object value() {
-        return value;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        SchemaAndValue that = (SchemaAndValue) o;
-        return Objects.equals(schema, that.schema) &&
-                Objects.equals(value, that.value);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(schema, value);
-    }
-
-    @Override
-    public String toString() {
-        return "SchemaAndValue{" +
-                "schema=" + schema +
-                ", value=" + value +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
deleted file mode 100644
index 21ae54c..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java
+++ /dev/null
@@ -1,412 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-import org.apache.kafka.copycat.errors.SchemaBuilderException;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * <p>
- *     SchemaBuilder provides a fluent API for constructing {@link Schema} 
objects. It allows you to set each of the
- *     properties for the schema and each call returns the SchemaBuilder so 
the calls can be chained. When nested types
- *     are required, use one of the predefined schemas from {@link Schema} or 
use a second SchemaBuilder inline.
- * </p>
- * <p>
- *     Here is an example of building a struct schema:
- *     <pre>
- *     Schema dateSchema = SchemaBuilder.struct()
- *         .name("com.example.CalendarDate").version(2).doc("A calendar date 
including month, day, and year.")
- *         .field("month", Schema.STRING_SCHEMA)
- *         .field("day", Schema.INT8_SCHEMA)
- *         .field("year", Schema.INT16_SCHEMA)
- *         .build();
- *     </pre>
- * </p>
- * <p>
- *     Here is an example of using a second SchemaBuilder to construct 
complex, nested types:
- *     <pre>
- *     Schema userListSchema = SchemaBuilder.array(
- *         SchemaBuilder.struct().name("com.example.User").field("username", 
Schema.STRING_SCHEMA).field("id", Schema.INT64_SCHEMA).build()
- *     ).build();
- *     </pre>
- * </p>
- */
-public class SchemaBuilder implements Schema {
-    private static final String TYPE_FIELD = "type";
-    private static final String OPTIONAL_FIELD = "optional";
-    private static final String DEFAULT_FIELD = "default";
-    private static final String NAME_FIELD = "name";
-    private static final String VERSION_FIELD = "version";
-    private static final String DOC_FIELD = "doc";
-
-
-    private final Type type;
-    private Boolean optional = null;
-    private Object defaultValue = null;
-
-    private List<Field> fields = null;
-    private Schema keySchema = null;
-    private Schema valueSchema = null;
-
-    private String name;
-    private Integer version;
-    // Optional human readable documentation describing this schema.
-    private String doc;
-    // Additional parameters for logical types.
-    private Map<String, String> parameters;
-
-    private SchemaBuilder(Type type) {
-        this.type = type;
-    }
-
-    // Common/metadata fields
-
-    @Override
-    public boolean isOptional() {
-        return optional == null ? false : optional;
-    }
-
-    /**
-     * Set this schema as optional.
-     * @return the SchemaBuilder
-     */
-    public SchemaBuilder optional() {
-        checkNull(OPTIONAL_FIELD, optional);
-        optional = true;
-        return this;
-    }
-
-    /**
-     * Set this schema as required. This is the default, but this method can 
be used to make this choice explicit.
-     * @return the SchemaBuilder
-     */
-    public SchemaBuilder required() {
-        checkNull(OPTIONAL_FIELD, optional);
-        optional = false;
-        return this;
-    }
-
-    @Override
-    public Object defaultValue() {
-        return defaultValue;
-    }
-
-    /**
-     * Set the default value for this schema. The value is validated against 
the schema type, throwing a
-     * {@link SchemaBuilderException} if it does not match.
-     * @param value the default value
-     * @return the SchemaBuilder
-     */
-    public SchemaBuilder defaultValue(Object value) {
-        checkNull(DEFAULT_FIELD, defaultValue);
-        checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
-        try {
-            CopycatSchema.validateValue(this, value);
-        } catch (DataException e) {
-            throw new SchemaBuilderException("Invalid default value", e);
-        }
-        defaultValue = value;
-        return this;
-    }
-
-    @Override
-    public String name() {
-        return name;
-    }
-
-    /**
-     * Set the name of this schema.
-     * @param name the schema name
-     * @return the SchemaBuilder
-     */
-    public SchemaBuilder name(String name) {
-        checkNull(NAME_FIELD, this.name);
-        this.name = name;
-        return this;
-    }
-
-    @Override
-    public Integer version() {
-        return version;
-    }
-
-    /**
-     * Set the version of this schema. Schema versions are integers which, if 
provided, must indicate which schema is
-     * newer and which is older by their ordering.
-     * @param version the schema version
-     * @return the SchemaBuilder
-     */
-    public SchemaBuilder version(Integer version) {
-        checkNull(VERSION_FIELD, this.version);
-        this.version = version;
-        return this;
-    }
-
-    @Override
-    public String doc() {
-        return doc;
-    }
-
-    /**
-     * Set the documentation for this schema.
-     * @param doc the documentation
-     * @return the SchemaBuilder
-     */
-    public SchemaBuilder doc(String doc) {
-        checkNull(DOC_FIELD, this.doc);
-        this.doc = doc;
-        return this;
-    }
-
-    @Override
-    public Map<String, String> parameters() {
-        return Collections.unmodifiableMap(parameters);
-    }
-
-    /**
-     * Set a schema parameter.
-     * @param propertyName name of the schema property to define
-     * @param propertyValue value of the schema property to define, as a String
-     * @return the SchemaBuilder
-     */
-    public SchemaBuilder parameter(String propertyName, String propertyValue) {
-        // Preserve order of insertion with a LinkedHashMap. This isn't 
strictly necessary, but is nice if logical types
-        // can print their properties in a consistent order.
-        if (parameters == null)
-            parameters = new LinkedHashMap<>();
-        parameters.put(propertyName, propertyValue);
-        return this;
-    }
-
-    /**
-     * Set schema parameters. This operation is additive; it does not remove 
existing parameters that do not appear in
-     * the set of properties pass to this method.
-     * @param props Map of properties to set
-     * @return the SchemaBuilder
-     */
-    public SchemaBuilder parameters(Map<String, String> props) {
-        // Avoid creating an empty set of properties so we never have an empty 
map
-        if (props.isEmpty())
-            return this;
-        if (parameters == null)
-            parameters = new LinkedHashMap<>();
-        parameters.putAll(props);
-        return this;
-    }
-
-    @Override
-    public Type type() {
-        return type;
-    }
-
-    /**
-     * Create a SchemaBuilder for the specified type.
-     *
-     * Usually it will be simpler to use one of the variants like {@link 
#string()} or {@link #struct()}, but this form
-     * can be useful when generating schemas dynamically.
-     *
-     * @param type the schema type
-     * @return a new SchemaBuilder
-     */
-    public static SchemaBuilder type(Type type) {
-        return new SchemaBuilder(type);
-    }
-
-    // Primitive types
-
-    /**
-     * @return a new {@link Type#INT8} SchemaBuilder
-     */
-    public static SchemaBuilder int8() {
-        return new SchemaBuilder(Type.INT8);
-    }
-
-    /**
-     * @return a new {@link Type#INT16} SchemaBuilder
-     */
-    public static SchemaBuilder int16() {
-        return new SchemaBuilder(Type.INT16);
-    }
-
-    /**
-     * @return a new {@link Type#INT32} SchemaBuilder
-     */
-    public static SchemaBuilder int32() {
-        return new SchemaBuilder(Type.INT32);
-    }
-
-    /**
-     * @return a new {@link Type#INT64} SchemaBuilder
-     */
-    public static SchemaBuilder int64() {
-        return new SchemaBuilder(Type.INT64);
-    }
-
-    /**
-     * @return a new {@link Type#FLOAT32} SchemaBuilder
-     */
-    public static SchemaBuilder float32() {
-        return new SchemaBuilder(Type.FLOAT32);
-    }
-
-    /**
-     * @return a new {@link Type#FLOAT64} SchemaBuilder
-     */
-    public static SchemaBuilder float64() {
-        return new SchemaBuilder(Type.FLOAT64);
-    }
-
-    /**
-     * @return a new {@link Type#BOOLEAN} SchemaBuilder
-     */
-    public static SchemaBuilder bool() {
-        return new SchemaBuilder(Type.BOOLEAN);
-    }
-
-    /**
-     * @return a new {@link Type#STRING} SchemaBuilder
-     */
-    public static SchemaBuilder string() {
-        return new SchemaBuilder(Type.STRING);
-    }
-
-    /**
-     * @return a new {@link Type#BYTES} SchemaBuilder
-     */
-    public static SchemaBuilder bytes() {
-        return new SchemaBuilder(Type.BYTES);
-    }
-
-
-    // Structs
-
-    /**
-     * @return a new {@link Type#STRUCT} SchemaBuilder
-     */
-    public static SchemaBuilder struct() {
-        return new SchemaBuilder(Type.STRUCT);
-    }
-
-    /**
-     * Add a field to this struct schema. Throws a SchemaBuilderException if 
this is not a struct schema.
-     * @param fieldName the name of the field to add
-     * @param fieldSchema the Schema for the field's value
-     * @return the SchemaBuilder
-     */
-    public SchemaBuilder field(String fieldName, Schema fieldSchema) {
-        if (type != Type.STRUCT)
-            throw new SchemaBuilderException("Cannot create fields on type " + 
type);
-        if (fields == null)
-            fields = new ArrayList<>();
-        int fieldIndex = fields.size();
-        fields.add(new Field(fieldName, fieldIndex, fieldSchema));
-        return this;
-    }
-
-    /**
-     * Get the list of fields for this Schema. Throws a DataException if this 
schema is not a struct.
-     * @return the list of fields for this Schema
-     */
-    public List<Field> fields() {
-        if (type != Type.STRUCT)
-            throw new DataException("Cannot list fields on non-struct type");
-        return fields;
-    }
-
-    public Field field(String fieldName) {
-        if (type != Type.STRUCT)
-            throw new DataException("Cannot look up fields on non-struct 
type");
-        for (Field field : fields)
-            if (field.name() == fieldName)
-                return field;
-        return null;
-    }
-
-
-
-    // Maps & Arrays
-
-    /**
-     * @param valueSchema the schema for elements of the array
-     * @return a new {@link Type#ARRAY} SchemaBuilder
-     */
-    public static SchemaBuilder array(Schema valueSchema) {
-        SchemaBuilder builder = new SchemaBuilder(Type.ARRAY);
-        builder.valueSchema = valueSchema;
-        return builder;
-    }
-
-    /**
-     * @param keySchema the schema for keys in the map
-     * @param valueSchema the schema for values in the map
-     * @return a new {@link Type#MAP} SchemaBuilder
-     */
-    public static SchemaBuilder map(Schema keySchema, Schema valueSchema) {
-        SchemaBuilder builder = new SchemaBuilder(Type.MAP);
-        builder.keySchema = keySchema;
-        builder.valueSchema = valueSchema;
-        return builder;
-    }
-
-    @Override
-    public Schema keySchema() {
-        return keySchema;
-    }
-
-    @Override
-    public Schema valueSchema() {
-        return valueSchema;
-    }
-
-
-    /**
-     * Build the Schema using the current settings
-     * @return the {@link Schema}
-     */
-    public Schema build() {
-        return new CopycatSchema(type, isOptional(), defaultValue, name, 
version, doc,
-                parameters == null ? null : 
Collections.unmodifiableMap(parameters),
-                fields == null ? null : Collections.unmodifiableList(fields), 
keySchema, valueSchema);
-    }
-
-    /**
-     * Return a concrete instance of the {@link Schema} specified by this 
builder
-     * @return the {@link Schema}
-     */
-    @Override
-    public Schema schema() {
-        return build();
-    }
-
-
-    private static void checkNull(String fieldName, Object val) {
-        if (val != null)
-            throw new SchemaBuilderException("Invalid SchemaBuilder call: " + 
fieldName + " has already been set.");
-    }
-
-    private static void checkNotNull(String fieldName, Object val, String 
fieldToSet) {
-        if (val == null)
-            throw new SchemaBuilderException("Invalid SchemaBuilder call: " + 
fieldName + " must be specified to set " + fieldToSet);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
deleted file mode 100644
index 3ab9e7f..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java
+++ /dev/null
@@ -1,197 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.data.Schema.Type;
-import org.apache.kafka.copycat.errors.SchemaProjectorException;
-
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-
-/**
- * <p>
- *     SchemaProjector is utility to project a value between compatible 
schemas and throw exceptions
- *     when non compatible schemas are provided.
- * </p>
- */
-
-public class SchemaProjector {
-
-    private static Set<AbstractMap.SimpleImmutableEntry<Type, Type>> 
promotable = new HashSet<>();
-
-    static {
-        Type[] promotableTypes = {Type.INT8, Type.INT16, Type.INT32, 
Type.INT64, Type.FLOAT32, Type.FLOAT64};
-        for (int i = 0; i < promotableTypes.length; ++i) {
-            for (int j = i; j < promotableTypes.length; ++j) {
-                promotable.add(new 
AbstractMap.SimpleImmutableEntry<>(promotableTypes[i], promotableTypes[j]));
-            }
-        }
-    }
-
-    /**
-     * This method project a value between compatible schemas and throw 
exceptions when non compatible schemas are provided
-     * @param source the schema used to construct the record
-     * @param record the value to project from source schema to target schema
-     * @param target the schema to project the record to
-     * @return the projected value with target schema
-     * @throws SchemaProjectorException
-     */
-    public static Object project(Schema source, Object record, Schema target) 
throws SchemaProjectorException {
-        checkMaybeCompatible(source, target);
-        if (source.isOptional() && !target.isOptional()) {
-            if (target.defaultValue() != null) {
-                if (record != null) {
-                    return projectRequiredSchema(source, record, target);
-                } else {
-                    return target.defaultValue();
-                }
-            } else {
-                throw new SchemaProjectorException("Writer schema is optional, 
however, target schema does not provide a default value.");
-            }
-        } else {
-            if (record != null) {
-                return projectRequiredSchema(source, record, target);
-            } else {
-                return null;
-            }
-        }
-    }
-
-    private static Object projectRequiredSchema(Schema source, Object record, 
Schema target) throws SchemaProjectorException {
-        switch (target.type()) {
-            case INT8:
-            case INT16:
-            case INT32:
-            case INT64:
-            case FLOAT32:
-            case FLOAT64:
-            case BOOLEAN:
-            case BYTES:
-            case STRING:
-                return projectPrimitive(source, record, target);
-            case STRUCT:
-                return projectStruct(source, (Struct) record, target);
-            case ARRAY:
-                return projectArray(source, record, target);
-            case MAP:
-                return projectMap(source, record, target);
-        }
-        return null;
-    }
-
-    private static Object projectStruct(Schema source, Struct sourceStruct, 
Schema target) throws SchemaProjectorException {
-        Struct targetStruct = new Struct(target);
-        for (Field targetField : target.fields()) {
-            String fieldName = targetField.name();
-            Field sourceField = source.field(fieldName);
-            if (sourceField != null) {
-                Object sourceFieldValue = sourceStruct.get(fieldName);
-                try {
-                    Object targetFieldValue = project(sourceField.schema(), 
sourceFieldValue, targetField.schema());
-                    targetStruct.put(fieldName, targetFieldValue);
-                } catch (SchemaProjectorException e) {
-                    throw new SchemaProjectorException("Error projecting " + 
sourceField.name(), e);
-                }
-            } else {
-                Object targetDefault;
-                if (targetField.schema().defaultValue() != null) {
-                    targetDefault = targetField.schema().defaultValue();
-                } else {
-                    throw new SchemaProjectorException("Cannot project " + 
source.schema() + " to " + target.schema());
-                }
-                targetStruct.put(fieldName, targetDefault);
-            }
-        }
-        return targetStruct;
-    }
-
-
-    private static void checkMaybeCompatible(Schema source, Schema target) {
-        if (source.type() != target.type() && !isPromotable(source.type(), 
target.type())) {
-            throw new SchemaProjectorException("Schema type mismatch. source 
type: " + source.type() + " and target type: " + target.type());
-        } else if (!Objects.equals(source.name(), target.name())) {
-            throw new SchemaProjectorException("Schema name mismatch. source 
name: " + source.name() + " and target name: " + target.name());
-        } else if (!Objects.equals(source.parameters(), target.parameters())) {
-            throw new SchemaProjectorException("Schema parameters not equal. 
source parameters: " + source.parameters() + " and target parameters: " + 
target.parameters());
-        }
-    }
-
-    private static Object projectArray(Schema source, Object record, Schema 
target) throws SchemaProjectorException {
-        List<?> array = (List<?>) record;
-        List<Object> retArray = new ArrayList<>();
-        for (Object entry : array) {
-            retArray.add(project(source.valueSchema(), entry, 
target.valueSchema()));
-        }
-        return retArray;
-    }
-
-    private static Object projectMap(Schema source, Object record, Schema 
target) throws SchemaProjectorException {
-        Map<?, ?> map = (Map<?, ?>) record;
-        Map<Object, Object> retMap = new HashMap<>();
-        for (Map.Entry<?, ?> entry : map.entrySet()) {
-            Object key = entry.getKey();
-            Object value = entry.getValue();
-            Object retKey = project(source.keySchema(), key, 
target.keySchema());
-            Object retValue = project(source.valueSchema(), value, 
target.valueSchema());
-            retMap.put(retKey, retValue);
-        }
-        return retMap;
-    }
-
-    private static Object projectPrimitive(Schema source, Object record, 
Schema target) throws SchemaProjectorException {
-        assert source.type().isPrimitive();
-        assert target.type().isPrimitive();
-        Object result;
-        if (isPromotable(source.type(), target.type())) {
-            Number numberRecord = (Number) record;
-            switch (target.type()) {
-                case INT8:
-                    result = numberRecord.byteValue();
-                    break;
-                case INT16:
-                    result = numberRecord.shortValue();
-                    break;
-                case INT32:
-                    result = numberRecord.intValue();
-                    break;
-                case INT64:
-                    result = numberRecord.longValue();
-                    break;
-                case FLOAT32:
-                    result = numberRecord.floatValue();
-                    break;
-                case FLOAT64:
-                    result = numberRecord.doubleValue();
-                    break;
-                default:
-                    throw new SchemaProjectorException("Not promotable type.");
-            }
-        } else {
-            result = record;
-        }
-        return result;
-    }
-
-    private static boolean isPromotable(Type sourceType, Type targetType) {
-        return promotable.contains(new 
AbstractMap.SimpleImmutableEntry<>(sourceType, targetType));
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
deleted file mode 100644
index bd757c4..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-/**
- * <p>
- *     A structured record containing a set of named fields with values, each 
field using an independent {@link Schema}.
- *     Struct objects must specify a complete {@link Schema} up front, and 
only fields specified in the Schema may be set.
- * </p>
- * <p>
- *     The Struct's {@link #put(String, Object)} method returns the Struct 
itself to provide a fluent API for constructing
- *     complete objects:
- *     <pre>
- *         Schema schema = SchemaBuilder.struct().name("com.example.Person")
- *             .field("name", Schema.STRING_SCHEMA).field("age", 
Schema.INT32_SCHEMA).build()
- *         Struct struct = new Struct(schema).put("name", "Bobby 
McGee").put("age", 21)
- *     </pre>
- * </p>
- */
-public class Struct {
-
-    private final Schema schema;
-    private final Object[] values;
-
-    /**
-     * Create a new Struct for this {@link Schema}
-     * @param schema the {@link Schema} for the Struct
-     */
-    public Struct(Schema schema) {
-        if (schema.type() != Schema.Type.STRUCT)
-            throw new DataException("Not a struct schema: " + schema);
-        this.schema = schema;
-        this.values = new Object[schema.fields().size()];
-    }
-
-    /**
-     * Get the schema for this Struct.
-     * @return the Struct's schema
-     */
-    public Schema schema() {
-        return schema;
-    }
-
-    /**
-     * Get the value of a field, returning the default value if no value has 
been set yet and a default value is specified
-     * in the field's schema. Because this handles fields of all types, the 
value is returned as an {@link Object} and
-     * must be cast to a more specific type.
-     * @param fieldName the field name to lookup
-     * @return the value for the field
-     */
-    public Object get(String fieldName) {
-        Field field = lookupField(fieldName);
-        return get(field);
-    }
-
-    /**
-     * Get the value of a field, returning the default value if no value has 
been set yet and a default value is specified
-     * in the field's schema. Because this handles fields of all types, the 
value is returned as an {@link Object} and
-     * must be cast to a more specific type.
-     * @param field the field to lookup
-     * @return the value for the field
-     */
-    public Object get(Field field) {
-        Object val = values[field.index()];
-        if (val == null && schema.defaultValue() != null) {
-            val = schema.defaultValue();
-        }
-        return val;
-    }
-
-    /**
-     * Get the underlying raw value for the field without accounting for 
default values.
-     * @param fieldName the field to get the value of
-     * @return the raw value
-     */
-    public Object getWithoutDefault(String fieldName) {
-        Field field = lookupField(fieldName);
-        return values[field.index()];
-    }
-
-    // Note that all getters have to have boxed return types since the fields 
might be optional
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
Byte.
-     */
-    public Byte getInt8(String fieldName) {
-        return (Byte) getCheckType(fieldName, Schema.Type.INT8);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
Short.
-     */
-    public Short getInt16(String fieldName) {
-        return (Short) getCheckType(fieldName, Schema.Type.INT16);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
Integer.
-     */
-    public Integer getInt32(String fieldName) {
-        return (Integer) getCheckType(fieldName, Schema.Type.INT32);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
Long.
-     */
-    public Long getInt64(String fieldName) {
-        return (Long) getCheckType(fieldName, Schema.Type.INT64);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
Float.
-     */
-    public Float getFloat32(String fieldName) {
-        return (Float) getCheckType(fieldName, Schema.Type.FLOAT32);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
Double.
-     */
-    public Double getFloat64(String fieldName) {
-        return (Double) getCheckType(fieldName, Schema.Type.FLOAT64);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
Boolean.
-     */
-    public Boolean getBoolean(String fieldName) {
-        return (Boolean) getCheckType(fieldName, Schema.Type.BOOLEAN);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
String.
-     */
-    public String getString(String fieldName) {
-        return (String) getCheckType(fieldName, Schema.Type.STRING);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
byte[].
-     */
-    public byte[] getBytes(String fieldName) {
-        Object bytes = getCheckType(fieldName, Schema.Type.BYTES);
-        if (bytes instanceof ByteBuffer)
-            return ((ByteBuffer) bytes).array();
-        return (byte[]) bytes;
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
List.
-     */
-    public <T> List<T> getArray(String fieldName) {
-        return (List<T>) getCheckType(fieldName, Schema.Type.ARRAY);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
Map.
-     */
-    public <K, V> Map<K, V> getMap(String fieldName) {
-        return (Map<K, V>) getCheckType(fieldName, Schema.Type.MAP);
-    }
-
-    /**
-     * Equivalent to calling {@link #get(String)} and casting the result to a 
Struct.
-     */
-    public Struct getStruct(String fieldName) {
-        return (Struct) getCheckType(fieldName, Schema.Type.STRUCT);
-    }
-
-    /**
-     * Set the value of a field. Validates the value, throwing a {@link 
DataException} if it does not match the field's
-     * {@link Schema}.
-     * @param fieldName the name of the field to set
-     * @param value the value of the field
-     * @return the Struct, to allow chaining of {@link #put(String, Object)} 
calls
-     */
-    public Struct put(String fieldName, Object value) {
-        Field field = lookupField(fieldName);
-        return put(field, value);
-    }
-
-    /**
-     * Set the value of a field. Validates the value, throwing a {@link 
DataException} if it does not match the field's
-     * {@link Schema}.
-     * @param field the field to set
-     * @param value the value of the field
-     * @return the Struct, to allow chaining of {@link #put(String, Object)} 
calls
-     */
-    public Struct put(Field field, Object value) {
-        CopycatSchema.validateValue(field.schema(), value);
-        values[field.index()] = value;
-        return this;
-    }
-
-
-    /**
-     * Validates that this struct has filled in all the necessary data with 
valid values. For required fields
-     * without defaults, this validates that a value has been set and has 
matching types/schemas. If any validation
-     * fails, throws a DataException.
-     */
-    public void validate() {
-        for (Field field : schema.fields()) {
-            Schema fieldSchema = field.schema();
-            Object value = values[field.index()];
-            if (value == null && (fieldSchema.isOptional() || 
fieldSchema.defaultValue() != null))
-                continue;
-            CopycatSchema.validateValue(fieldSchema, value);
-        }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-        Struct struct = (Struct) o;
-        return Objects.equals(schema, struct.schema) &&
-                Arrays.equals(values, struct.values);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(schema, Arrays.hashCode(values));
-    }
-
-    private Field lookupField(String fieldName) {
-        Field field = schema.field(fieldName);
-        if (field == null)
-            throw new DataException(fieldName + " is not a valid field name");
-        return field;
-    }
-
-    // Get the field's value, but also check that the field matches the 
specified type, throwing an exception if it doesn't.
-    // Used to implement the get*() methods that return typed data instead of 
Object
-    private Object getCheckType(String fieldName, Schema.Type type) {
-        Field field = lookupField(fieldName);
-        if (field.schema().type() != type)
-            throw new DataException("Field '" + fieldName + "' is not of type 
" + type);
-        return values[field.index()];
-    }
-
-}
-

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
deleted file mode 100644
index e3255e0..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.Calendar;
-import java.util.TimeZone;
-
-/**
- * <p>
- *     A time representing a specific point in a day, not tied to any specific 
date. The corresponding Java type is a
- *     java.util.Date where only hours, minutes, seconds, and milliseconds can 
be non-zero. This effectively makes it a
- *     point in time during the first day after the Unix epoch. The underlying 
representation is an integer
- *     representing the number of milliseconds after midnight.
- * </p>
- */
-public class Time {
-    public static final String LOGICAL_NAME = 
"org.apache.kafka.copycat.data.Time";
-
-    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
-
-    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
-    /**
-     * Returns a SchemaBuilder for a Time. By returning a SchemaBuilder you 
can override additional schema settings such
-     * as required/optional, default value, and documentation.
-     * @return a SchemaBuilder
-     */
-    public static SchemaBuilder builder() {
-        return SchemaBuilder.int32()
-                .name(LOGICAL_NAME)
-                .version(1);
-    }
-
-    public static final Schema SCHEMA = builder().schema();
-
-    /**
-     * Convert a value from its logical format (Time) to it's encoded format.
-     * @param value the logical value
-     * @return the encoded value
-     */
-    public static int fromLogical(Schema schema, java.util.Date value) {
-        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
-            throw new DataException("Requested conversion of Time object but 
the schema does not match.");
-        Calendar calendar = Calendar.getInstance(UTC);
-        calendar.setTime(value);
-        long unixMillis = calendar.getTimeInMillis();
-        if (unixMillis < 0 || unixMillis > MILLIS_PER_DAY) {
-            throw new DataException("Copycat Time type should not have any 
date fields set to non-zero values.");
-        }
-        return (int) unixMillis;
-    }
-
-    public static java.util.Date toLogical(Schema schema, int value) {
-        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
-            throw new DataException("Requested conversion of Date object but 
the schema does not match.");
-        if (value  < 0 || value > MILLIS_PER_DAY)
-            throw new DataException("Time values must use number of 
milliseconds greater than 0 and less than 86400000");
-        return new java.util.Date(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java
deleted file mode 100644
index 62d371c..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.data;
-
-import org.apache.kafka.copycat.errors.DataException;
-
-import java.util.TimeZone;
-
-/**
- * <p>
- *     A timestamp representing an absolute time, without timezone 
information. The corresponding Java type is a
- *     java.util.Date. The underlying representation is a long representing 
the number of milliseconds since Unix epoch.
- * </p>
- */
-public class Timestamp {
-    public static final String LOGICAL_NAME = 
"org.apache.kafka.copycat.data.Timestamp";
-
-    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
-
-    /**
-     * Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder 
you can override additional schema settings such
-     * as required/optional, default value, and documentation.
-     * @return a SchemaBuilder
-     */
-    public static SchemaBuilder builder() {
-        return SchemaBuilder.int64()
-                .name(LOGICAL_NAME)
-                .version(1);
-    }
-
-    public static final Schema SCHEMA = builder().schema();
-
-    /**
-     * Convert a value from its logical format (Date) to it's encoded format.
-     * @param value the logical value
-     * @return the encoded value
-     */
-    public static long fromLogical(Schema schema, java.util.Date value) {
-        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
-            throw new DataException("Requested conversion of Timestamp object 
but the schema does not match.");
-        return value.getTime();
-    }
-
-    public static java.util.Date toLogical(Schema schema, long value) {
-        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
-            throw new DataException("Requested conversion of Timestamp object 
but the schema does not match.");
-        return new java.util.Date(value);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
deleted file mode 100644
index c8f1bad..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-/**
- * CopycatException is the top-level exception type generated by Copycat and 
connectors.
- */
-@InterfaceStability.Unstable
-public class CopycatException extends KafkaException {
-
-    public CopycatException(String s) {
-        super(s);
-    }
-
-    public CopycatException(String s, Throwable throwable) {
-        super(s, throwable);
-    }
-
-    public CopycatException(Throwable throwable) {
-        super(throwable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
deleted file mode 100644
index 11139a4..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-/**
- * Base class for all Copycat data API exceptions.
- */
-public class DataException extends CopycatException {
-    public DataException(String s) {
-        super(s);
-    }
-
-    public DataException(String s, Throwable throwable) {
-        super(s, throwable);
-    }
-
-    public DataException(Throwable throwable) {
-        super(throwable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
deleted file mode 100644
index 6f9f233..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-/**
- * Indicates that a method has been invoked illegally or at an invalid time by 
a connector or task.
- */
-public class IllegalWorkerStateException extends CopycatException {
-    public IllegalWorkerStateException(String s) {
-        super(s);
-    }
-
-    public IllegalWorkerStateException(String s, Throwable throwable) {
-        super(s, throwable);
-    }
-
-    public IllegalWorkerStateException(Throwable throwable) {
-        super(throwable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
deleted file mode 100644
index b5a93af..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.errors;
-
-public class SchemaBuilderException extends DataException {
-    public SchemaBuilderException(String s) {
-        super(s);
-    }
-
-    public SchemaBuilderException(String s, Throwable throwable) {
-        super(s, throwable);
-    }
-
-    public SchemaBuilderException(Throwable throwable) {
-        super(throwable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
 
b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
deleted file mode 100644
index be21418..0000000
--- 
a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- **/
-package org.apache.kafka.copycat.errors;
-
-public class SchemaProjectorException extends DataException {
-    public SchemaProjectorException(String s) {
-        super(s);
-    }
-
-    public SchemaProjectorException(String s, Throwable throwable) {
-        super(s, throwable);
-    }
-
-    public SchemaProjectorException(Throwable throwable) {
-        super(throwable);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java
deleted file mode 100644
index fb2e694..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.sink;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.Connector;
-
-/**
- * SinkConnectors implement the Connector interface to send Kafka data to 
another system.
- */
-@InterfaceStability.Unstable
-public abstract class SinkConnector extends Connector {
-
-    /**
-     * <p>
-     * Configuration key for the list of input topics for this connector.
-     * </p>
-     * <p>
-     * Usually this setting is only relevant to the Copycat framework, but is 
provided here for
-     * the convenience of Connector developers if they also need to know the 
set of topics.
-     * </p>
-     */
-    public static final String TOPICS_CONFIG = "topics";
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
----------------------------------------------------------------------
diff --git 
a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java 
b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
deleted file mode 100644
index 79ac725..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.sink;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.connector.CopycatRecord;
-import org.apache.kafka.copycat.data.Schema;
-
-/**
- * SinkRecord is a CopycatRecord that has been read from Kafka and includes 
the kafkaOffset of
- * the record in the Kafka topic-partition in addition to the standard fields. 
This information
- * should be used by the SinkTask to coordinate kafkaOffset commits.
- */
-@InterfaceStability.Unstable
-public class SinkRecord extends CopycatRecord {
-    private final long kafkaOffset;
-
-    public SinkRecord(String topic, int partition, Schema keySchema, Object 
key, Schema valueSchema, Object value, long kafkaOffset) {
-        super(topic, partition, keySchema, key, valueSchema, value);
-        this.kafkaOffset = kafkaOffset;
-    }
-
-    public long kafkaOffset() {
-        return kafkaOffset;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o)
-            return true;
-        if (o == null || getClass() != o.getClass())
-            return false;
-        if (!super.equals(o))
-            return false;
-
-        SinkRecord that = (SinkRecord) o;
-
-        if (kafkaOffset != that.kafkaOffset)
-            return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode() {
-        int result = super.hashCode();
-        result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32));
-        return result;
-    }
-
-    @Override
-    public String toString() {
-        return "SinkRecord{" +
-                "kafkaOffset=" + kafkaOffset +
-                "} " + super.toString();
-    }
-}

Reply via email to