http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Date.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Date.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Date.java
new file mode 100644
index 0000000..842da66
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Date.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+
+import java.util.Calendar;
+import java.util.TimeZone;
+
+/**
+ * <p>
+ *     A date representing a calendar day with no time of day or timezone. The 
corresponding Java type is a java.util.Date
+ *     with hours, minutes, seconds, milliseconds set to 0. The underlying 
representation is an integer representing the
+ *     number of standardized days (based on a number of milliseconds with 24 
hours/day, 60 minutes/hour, 60 seconds/minute,
+ *     1000 milliseconds/second with n) since Unix epoch.
+ * </p>
+ */
+public class Date {
+    public static final String LOGICAL_NAME = 
"org.apache.kafka.connect.data.Date";
+
+    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+    /**
+     * Returns a SchemaBuilder for a Date. By returning a SchemaBuilder you 
can override additional schema settings such
+     * as required/optional, default value, and documentation.
+     * @return a SchemaBuilder
+     */
+    public static SchemaBuilder builder() {
+        return SchemaBuilder.int32()
+                .name(LOGICAL_NAME)
+                .version(1);
+    }
+
+    public static final Schema SCHEMA = builder().schema();
+
+    /**
+     * Convert a value from its logical format (Date) to it's encoded format.
+     * @param value the logical value
+     * @return the encoded value
+     */
+    public static int fromLogical(Schema schema, java.util.Date value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Date object but 
the schema does not match.");
+        Calendar calendar = Calendar.getInstance(UTC);
+        calendar.setTime(value);
+        if (calendar.get(Calendar.HOUR_OF_DAY) != 0 || 
calendar.get(Calendar.MINUTE) != 0 ||
+                calendar.get(Calendar.SECOND) != 0 || 
calendar.get(Calendar.MILLISECOND) != 0) {
+            throw new DataException("Kafka Connect Date type should not have 
any time fields set to non-zero values.");
+        }
+        long unixMillis = calendar.getTimeInMillis();
+        return (int) (unixMillis / MILLIS_PER_DAY);
+    }
+
+    public static java.util.Date toLogical(Schema schema, int value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Date object but 
the schema does not match.");
+        return new java.util.Date(value * MILLIS_PER_DAY);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
new file mode 100644
index 0000000..e15f698
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+/**
+ * <p>
+ *     An arbitrary-precision signed decimal number. The value is unscaled * 
10 ^ -scale where:
+ *     <ul>
+ *         <li>unscaled is an integer </li>
+ *         <li>scale is an integer representing how many digits the decimal 
point should be shifted on the unscaled value</li>
+ *     </ul>
+ * </p>
+ * <p>
+ *     Decimal does not provide a fixed schema because it is parameterized by 
the scale, which is fixed on the schema
+ *     rather than being part of the value.
+ * </p>
+ * <p>
+ *     The underlying representation of this type is bytes containing a two's 
complement integer
+ * </p>
+ */
+public class Decimal {
+    public static final String LOGICAL_NAME = 
"org.apache.kafka.connect.data.Decimal";
+    public static final String SCALE_FIELD = "scale";
+
+    /**
+     * Returns a SchemaBuilder for a Decimal with the given scale factor. By 
returning a SchemaBuilder you can override
+     * additional schema settings such as required/optional, default value, 
and documentation.
+     * @param scale the scale factor to apply to unscaled values
+     * @return a SchemaBuilder
+     */
+    public static SchemaBuilder builder(int scale) {
+        return SchemaBuilder.bytes()
+                .name(LOGICAL_NAME)
+                .parameter(SCALE_FIELD, ((Integer) scale).toString())
+                .version(1);
+    }
+
+    public static Schema schema(int scale) {
+        return builder(scale).build();
+    }
+
+    /**
+     * Convert a value from its logical format (BigDecimal) to it's encoded 
format.
+     * @param value the logical value
+     * @return the encoded value
+     */
+    public static byte[] fromLogical(Schema schema, BigDecimal value) {
+        if (value.scale() != scale(schema))
+            throw new DataException("BigDecimal has mismatching scale value 
for given Decimal schema");
+        return value.unscaledValue().toByteArray();
+    }
+
+    public static BigDecimal toLogical(Schema schema, byte[] value) {
+        return new BigDecimal(new BigInteger(value), scale(schema));
+    }
+
+    private static int scale(Schema schema) {
+        String scaleString = schema.parameters().get(SCALE_FIELD);
+        if (scaleString == null)
+            throw new DataException("Invalid Decimal schema: scale parameter 
not found.");
+        try {
+            return Integer.parseInt(scaleString);
+        } catch (NumberFormatException e) {
+            throw new DataException("Invalid scale parameter found in Decimal 
schema: ", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Field.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Field.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Field.java
new file mode 100644
index 0000000..7dd32ce
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Field.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import java.util.Objects;
+
+/**
+ * <p>
+ *     A field in a {@link Struct}, consisting of a field name, index, and 
{@link Schema} for the field value.
+ * </p>
+ */
+public class Field {
+    private final String name;
+    private final int index;
+    private final Schema schema;
+
+    public Field(String name, int index, Schema schema) {
+        this.name = name;
+        this.index = index;
+        this.schema = schema;
+    }
+
+    /**
+     * Get the name of this field.
+     * @return the name of this field
+     */
+    public String name() {
+        return name;
+    }
+
+
+    /**
+     * Get the index of this field within the struct.
+     * @return the index of this field
+     */
+    public int index() {
+        return index;
+    }
+
+    /**
+     * Get the schema of this field
+     * @return the schema of values of this field
+     */
+    public Schema schema() {
+        return schema;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Field field = (Field) o;
+        return Objects.equals(index, field.index) &&
+                Objects.equals(name, field.name) &&
+                Objects.equals(schema, field.schema);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(name, index, schema);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java
new file mode 100644
index 0000000..3c0e40c
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <p>
+ *     Definition of an abstract data type. Data types can be primitive types 
(integer types, floating point types,
+ *     boolean, strings, and bytes) or complex types (typed arrays, maps with 
one key schema and value schema,
+ *     and structs that have a fixed set of field names each with an 
associated value schema). Any type can be specified
+ *     as optional, allowing it to be omitted (resulting in null values when 
it is missing) and can specify a default
+ *     value.
+ * </p>
+ * <p>
+ *     All schemas may have some associated metadata: a name, version, and 
documentation. These are all considered part
+ *     of the schema itself and included when comparing schemas. Besides 
adding important metadata, these fields enable
+ *     the specification of logical types that specify additional constraints 
and semantics (e.g. UNIX timestamps are
+ *     just an int64, but the user needs the know about the additional 
semantics to interpret it properly).
+ * </p>
+ * <p>
+ *     Schemas can be created directly, but in most cases using {@link 
SchemaBuilder} will be simpler.
+ * </p>
+ */
+public interface Schema {
+    /**
+     * The type of a schema. These only include the core types; logical types 
must be determined by checking the schema name.
+     */
+    enum Type {
+        INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, 
ARRAY, MAP, STRUCT;
+
+        private String name;
+
+        Type() {
+            this.name = this.name().toLowerCase();
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public boolean isPrimitive() {
+            switch (this) {
+                case INT8:
+                case INT16:
+                case INT32:
+                case INT64:
+                case FLOAT32:
+                case FLOAT64:
+                case BOOLEAN:
+                case STRING:
+                case BYTES:
+                    return true;
+            }
+            return false;
+        }
+    }
+
+
+    Schema INT8_SCHEMA = SchemaBuilder.int8().build();
+    Schema INT16_SCHEMA = SchemaBuilder.int16().build();
+    Schema INT32_SCHEMA = SchemaBuilder.int32().build();
+    Schema INT64_SCHEMA = SchemaBuilder.int64().build();
+    Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build();
+    Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build();
+    Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build();
+    Schema STRING_SCHEMA = SchemaBuilder.string().build();
+    Schema BYTES_SCHEMA = SchemaBuilder.bytes().build();
+
+    Schema OPTIONAL_INT8_SCHEMA = SchemaBuilder.int8().optional().build();
+    Schema OPTIONAL_INT16_SCHEMA = SchemaBuilder.int16().optional().build();
+    Schema OPTIONAL_INT32_SCHEMA = SchemaBuilder.int32().optional().build();
+    Schema OPTIONAL_INT64_SCHEMA = SchemaBuilder.int64().optional().build();
+    Schema OPTIONAL_FLOAT32_SCHEMA = 
SchemaBuilder.float32().optional().build();
+    Schema OPTIONAL_FLOAT64_SCHEMA = 
SchemaBuilder.float64().optional().build();
+    Schema OPTIONAL_BOOLEAN_SCHEMA = SchemaBuilder.bool().optional().build();
+    Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build();
+    Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build();
+
+    /**
+     * @return the type of this schema
+     */
+    Type type();
+
+    /**
+     * @return true if this field is optional, false otherwise
+     */
+    boolean isOptional();
+
+    /**
+     * @return the default value for this schema
+     */
+    Object defaultValue();
+
+    /**
+     * @return the name of this schema
+     */
+    String name();
+
+    /**
+     * Get the optional version of the schema. If a version is included, newer 
versions *must* be larger than older ones.
+     * @return the version of this schema
+     */
+    Integer version();
+
+    /**
+     * @return the documentation for this schema
+     */
+    String doc();
+
+    /**
+     * Get a map of schema parameters.
+     * @return Map containing parameters for this schema, or null if there are 
no parameters
+     */
+    Map<String, String> parameters();
+
+    /**
+     * Get the key schema for this map schema. Throws a DataException if this 
schema is not a map.
+     * @return the key schema
+     */
+    Schema keySchema();
+
+    /**
+     * Get the value schema for this map or array schema. Throws a 
DataException if this schema is not a map or array.
+     * @return the value schema
+     */
+    Schema valueSchema();
+
+    /**
+     * Get the list of fields for this Schema. Throws a DataException if this 
schema is not a struct.
+     * @return the list of fields for this Schema
+     */
+    List<Field> fields();
+
+    /**
+     * Get a field for this Schema by name. Throws a DataException if this 
schema is not a struct.
+     * @param fieldName the name of the field to look up
+     * @return the Field object for the specified field, or null if there is 
no field with the given name
+     */
+    Field field(String fieldName);
+
+    /**
+     * Return a concrete instance of the {@link Schema}
+     * @return the {@link Schema}
+     */
+    Schema schema();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaAndValue.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaAndValue.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaAndValue.java
new file mode 100644
index 0000000..71198f0
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaAndValue.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import java.util.Objects;
+
+public class SchemaAndValue {
+    private final Schema schema;
+    private final Object value;
+
+    public static final SchemaAndValue NULL = new SchemaAndValue(null, null);
+
+    public SchemaAndValue(Schema schema, Object value) {
+        this.value = value;
+        this.schema = schema;
+    }
+
+    public Schema schema() {
+        return schema;
+    }
+
+    public Object value() {
+        return value;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        SchemaAndValue that = (SchemaAndValue) o;
+        return Objects.equals(schema, that.schema) &&
+                Objects.equals(value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(schema, value);
+    }
+
+    @Override
+    public String toString() {
+        return "SchemaAndValue{" +
+                "schema=" + schema +
+                ", value=" + value +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
new file mode 100644
index 0000000..36cbf91
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java
@@ -0,0 +1,412 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.errors.SchemaBuilderException;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * <p>
+ *     SchemaBuilder provides a fluent API for constructing {@link Schema} 
objects. It allows you to set each of the
+ *     properties for the schema and each call returns the SchemaBuilder so 
the calls can be chained. When nested types
+ *     are required, use one of the predefined schemas from {@link Schema} or 
use a second SchemaBuilder inline.
+ * </p>
+ * <p>
+ *     Here is an example of building a struct schema:
+ *     <pre>
+ *     Schema dateSchema = SchemaBuilder.struct()
+ *         .name("com.example.CalendarDate").version(2).doc("A calendar date 
including month, day, and year.")
+ *         .field("month", Schema.STRING_SCHEMA)
+ *         .field("day", Schema.INT8_SCHEMA)
+ *         .field("year", Schema.INT16_SCHEMA)
+ *         .build();
+ *     </pre>
+ * </p>
+ * <p>
+ *     Here is an example of using a second SchemaBuilder to construct 
complex, nested types:
+ *     <pre>
+ *     Schema userListSchema = SchemaBuilder.array(
+ *         SchemaBuilder.struct().name("com.example.User").field("username", 
Schema.STRING_SCHEMA).field("id", Schema.INT64_SCHEMA).build()
+ *     ).build();
+ *     </pre>
+ * </p>
+ */
+public class SchemaBuilder implements Schema {
+    private static final String TYPE_FIELD = "type";
+    private static final String OPTIONAL_FIELD = "optional";
+    private static final String DEFAULT_FIELD = "default";
+    private static final String NAME_FIELD = "name";
+    private static final String VERSION_FIELD = "version";
+    private static final String DOC_FIELD = "doc";
+
+
+    private final Type type;
+    private Boolean optional = null;
+    private Object defaultValue = null;
+
+    private List<Field> fields = null;
+    private Schema keySchema = null;
+    private Schema valueSchema = null;
+
+    private String name;
+    private Integer version;
+    // Optional human readable documentation describing this schema.
+    private String doc;
+    // Additional parameters for logical types.
+    private Map<String, String> parameters;
+
+    private SchemaBuilder(Type type) {
+        this.type = type;
+    }
+
+    // Common/metadata fields
+
+    @Override
+    public boolean isOptional() {
+        return optional == null ? false : optional;
+    }
+
+    /**
+     * Set this schema as optional.
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder optional() {
+        checkNull(OPTIONAL_FIELD, optional);
+        optional = true;
+        return this;
+    }
+
+    /**
+     * Set this schema as required. This is the default, but this method can 
be used to make this choice explicit.
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder required() {
+        checkNull(OPTIONAL_FIELD, optional);
+        optional = false;
+        return this;
+    }
+
+    @Override
+    public Object defaultValue() {
+        return defaultValue;
+    }
+
+    /**
+     * Set the default value for this schema. The value is validated against 
the schema type, throwing a
+     * {@link SchemaBuilderException} if it does not match.
+     * @param value the default value
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder defaultValue(Object value) {
+        checkNull(DEFAULT_FIELD, defaultValue);
+        checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD);
+        try {
+            ConnectSchema.validateValue(this, value);
+        } catch (DataException e) {
+            throw new SchemaBuilderException("Invalid default value", e);
+        }
+        defaultValue = value;
+        return this;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    /**
+     * Set the name of this schema.
+     * @param name the schema name
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder name(String name) {
+        checkNull(NAME_FIELD, this.name);
+        this.name = name;
+        return this;
+    }
+
+    @Override
+    public Integer version() {
+        return version;
+    }
+
+    /**
+     * Set the version of this schema. Schema versions are integers which, if 
provided, must indicate which schema is
+     * newer and which is older by their ordering.
+     * @param version the schema version
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder version(Integer version) {
+        checkNull(VERSION_FIELD, this.version);
+        this.version = version;
+        return this;
+    }
+
+    @Override
+    public String doc() {
+        return doc;
+    }
+
+    /**
+     * Set the documentation for this schema.
+     * @param doc the documentation
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder doc(String doc) {
+        checkNull(DOC_FIELD, this.doc);
+        this.doc = doc;
+        return this;
+    }
+
+    @Override
+    public Map<String, String> parameters() {
+        return Collections.unmodifiableMap(parameters);
+    }
+
+    /**
+     * Set a schema parameter.
+     * @param propertyName name of the schema property to define
+     * @param propertyValue value of the schema property to define, as a String
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder parameter(String propertyName, String propertyValue) {
+        // Preserve order of insertion with a LinkedHashMap. This isn't 
strictly necessary, but is nice if logical types
+        // can print their properties in a consistent order.
+        if (parameters == null)
+            parameters = new LinkedHashMap<>();
+        parameters.put(propertyName, propertyValue);
+        return this;
+    }
+
+    /**
+     * Set schema parameters. This operation is additive; it does not remove 
existing parameters that do not appear in
+     * the set of properties pass to this method.
+     * @param props Map of properties to set
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder parameters(Map<String, String> props) {
+        // Avoid creating an empty set of properties so we never have an empty 
map
+        if (props.isEmpty())
+            return this;
+        if (parameters == null)
+            parameters = new LinkedHashMap<>();
+        parameters.putAll(props);
+        return this;
+    }
+
+    @Override
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Create a SchemaBuilder for the specified type.
+     *
+     * Usually it will be simpler to use one of the variants like {@link 
#string()} or {@link #struct()}, but this form
+     * can be useful when generating schemas dynamically.
+     *
+     * @param type the schema type
+     * @return a new SchemaBuilder
+     */
+    public static SchemaBuilder type(Type type) {
+        return new SchemaBuilder(type);
+    }
+
+    // Primitive types
+
+    /**
+     * @return a new {@link Type#INT8} SchemaBuilder
+     */
+    public static SchemaBuilder int8() {
+        return new SchemaBuilder(Type.INT8);
+    }
+
+    /**
+     * @return a new {@link Type#INT16} SchemaBuilder
+     */
+    public static SchemaBuilder int16() {
+        return new SchemaBuilder(Type.INT16);
+    }
+
+    /**
+     * @return a new {@link Type#INT32} SchemaBuilder
+     */
+    public static SchemaBuilder int32() {
+        return new SchemaBuilder(Type.INT32);
+    }
+
+    /**
+     * @return a new {@link Type#INT64} SchemaBuilder
+     */
+    public static SchemaBuilder int64() {
+        return new SchemaBuilder(Type.INT64);
+    }
+
+    /**
+     * @return a new {@link Type#FLOAT32} SchemaBuilder
+     */
+    public static SchemaBuilder float32() {
+        return new SchemaBuilder(Type.FLOAT32);
+    }
+
+    /**
+     * @return a new {@link Type#FLOAT64} SchemaBuilder
+     */
+    public static SchemaBuilder float64() {
+        return new SchemaBuilder(Type.FLOAT64);
+    }
+
+    /**
+     * @return a new {@link Type#BOOLEAN} SchemaBuilder
+     */
+    public static SchemaBuilder bool() {
+        return new SchemaBuilder(Type.BOOLEAN);
+    }
+
+    /**
+     * @return a new {@link Type#STRING} SchemaBuilder
+     */
+    public static SchemaBuilder string() {
+        return new SchemaBuilder(Type.STRING);
+    }
+
+    /**
+     * @return a new {@link Type#BYTES} SchemaBuilder
+     */
+    public static SchemaBuilder bytes() {
+        return new SchemaBuilder(Type.BYTES);
+    }
+
+
+    // Structs
+
+    /**
+     * @return a new {@link Type#STRUCT} SchemaBuilder
+     */
+    public static SchemaBuilder struct() {
+        return new SchemaBuilder(Type.STRUCT);
+    }
+
+    /**
+     * Add a field to this struct schema. Throws a SchemaBuilderException if 
this is not a struct schema.
+     * @param fieldName the name of the field to add
+     * @param fieldSchema the Schema for the field's value
+     * @return the SchemaBuilder
+     */
+    public SchemaBuilder field(String fieldName, Schema fieldSchema) {
+        if (type != Type.STRUCT)
+            throw new SchemaBuilderException("Cannot create fields on type " + 
type);
+        if (fields == null)
+            fields = new ArrayList<>();
+        int fieldIndex = fields.size();
+        fields.add(new Field(fieldName, fieldIndex, fieldSchema));
+        return this;
+    }
+
+    /**
+     * Get the list of fields for this Schema. Throws a DataException if this 
schema is not a struct.
+     * @return the list of fields for this Schema
+     */
+    public List<Field> fields() {
+        if (type != Type.STRUCT)
+            throw new DataException("Cannot list fields on non-struct type");
+        return fields;
+    }
+
+    public Field field(String fieldName) {
+        if (type != Type.STRUCT)
+            throw new DataException("Cannot look up fields on non-struct 
type");
+        for (Field field : fields)
+            if (field.name() == fieldName)
+                return field;
+        return null;
+    }
+
+
+
+    // Maps & Arrays
+
+    /**
+     * @param valueSchema the schema for elements of the array
+     * @return a new {@link Type#ARRAY} SchemaBuilder
+     */
+    public static SchemaBuilder array(Schema valueSchema) {
+        SchemaBuilder builder = new SchemaBuilder(Type.ARRAY);
+        builder.valueSchema = valueSchema;
+        return builder;
+    }
+
+    /**
+     * @param keySchema the schema for keys in the map
+     * @param valueSchema the schema for values in the map
+     * @return a new {@link Type#MAP} SchemaBuilder
+     */
+    public static SchemaBuilder map(Schema keySchema, Schema valueSchema) {
+        SchemaBuilder builder = new SchemaBuilder(Type.MAP);
+        builder.keySchema = keySchema;
+        builder.valueSchema = valueSchema;
+        return builder;
+    }
+
+    @Override
+    public Schema keySchema() {
+        return keySchema;
+    }
+
+    @Override
+    public Schema valueSchema() {
+        return valueSchema;
+    }
+
+
+    /**
+     * Build the Schema using the current settings
+     * @return the {@link Schema}
+     */
+    public Schema build() {
+        return new ConnectSchema(type, isOptional(), defaultValue, name, 
version, doc,
+                parameters == null ? null : 
Collections.unmodifiableMap(parameters),
+                fields == null ? null : Collections.unmodifiableList(fields), 
keySchema, valueSchema);
+    }
+
+    /**
+     * Return a concrete instance of the {@link Schema} specified by this 
builder
+     * @return the {@link Schema}
+     */
+    @Override
+    public Schema schema() {
+        return build();
+    }
+
+
+    private static void checkNull(String fieldName, Object val) {
+        if (val != null)
+            throw new SchemaBuilderException("Invalid SchemaBuilder call: " + 
fieldName + " has already been set.");
+    }
+
+    private static void checkNotNull(String fieldName, Object val, String 
fieldToSet) {
+        if (val == null)
+            throw new SchemaBuilderException("Invalid SchemaBuilder call: " + 
fieldName + " must be specified to set " + fieldToSet);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
new file mode 100644
index 0000000..ad0caf8
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.errors.SchemaProjectorException;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * <p>
+ *     SchemaProjector is utility to project a value between compatible 
schemas and throw exceptions
+ *     when non compatible schemas are provided.
+ * </p>
+ */
+
+public class SchemaProjector {
+
+    private static Set<AbstractMap.SimpleImmutableEntry<Type, Type>> 
promotable = new HashSet<>();
+
+    static {
+        Type[] promotableTypes = {Type.INT8, Type.INT16, Type.INT32, 
Type.INT64, Type.FLOAT32, Type.FLOAT64};
+        for (int i = 0; i < promotableTypes.length; ++i) {
+            for (int j = i; j < promotableTypes.length; ++j) {
+                promotable.add(new 
AbstractMap.SimpleImmutableEntry<>(promotableTypes[i], promotableTypes[j]));
+            }
+        }
+    }
+
+    /**
+     * This method project a value between compatible schemas and throw 
exceptions when non compatible schemas are provided
+     * @param source the schema used to construct the record
+     * @param record the value to project from source schema to target schema
+     * @param target the schema to project the record to
+     * @return the projected value with target schema
+     * @throws SchemaProjectorException
+     */
+    public static Object project(Schema source, Object record, Schema target) 
throws SchemaProjectorException {
+        checkMaybeCompatible(source, target);
+        if (source.isOptional() && !target.isOptional()) {
+            if (target.defaultValue() != null) {
+                if (record != null) {
+                    return projectRequiredSchema(source, record, target);
+                } else {
+                    return target.defaultValue();
+                }
+            } else {
+                throw new SchemaProjectorException("Writer schema is optional, 
however, target schema does not provide a default value.");
+            }
+        } else {
+            if (record != null) {
+                return projectRequiredSchema(source, record, target);
+            } else {
+                return null;
+            }
+        }
+    }
+
+    private static Object projectRequiredSchema(Schema source, Object record, 
Schema target) throws SchemaProjectorException {
+        switch (target.type()) {
+            case INT8:
+            case INT16:
+            case INT32:
+            case INT64:
+            case FLOAT32:
+            case FLOAT64:
+            case BOOLEAN:
+            case BYTES:
+            case STRING:
+                return projectPrimitive(source, record, target);
+            case STRUCT:
+                return projectStruct(source, (Struct) record, target);
+            case ARRAY:
+                return projectArray(source, record, target);
+            case MAP:
+                return projectMap(source, record, target);
+        }
+        return null;
+    }
+
+    private static Object projectStruct(Schema source, Struct sourceStruct, 
Schema target) throws SchemaProjectorException {
+        Struct targetStruct = new Struct(target);
+        for (Field targetField : target.fields()) {
+            String fieldName = targetField.name();
+            Field sourceField = source.field(fieldName);
+            if (sourceField != null) {
+                Object sourceFieldValue = sourceStruct.get(fieldName);
+                try {
+                    Object targetFieldValue = project(sourceField.schema(), 
sourceFieldValue, targetField.schema());
+                    targetStruct.put(fieldName, targetFieldValue);
+                } catch (SchemaProjectorException e) {
+                    throw new SchemaProjectorException("Error projecting " + 
sourceField.name(), e);
+                }
+            } else {
+                Object targetDefault;
+                if (targetField.schema().defaultValue() != null) {
+                    targetDefault = targetField.schema().defaultValue();
+                } else {
+                    throw new SchemaProjectorException("Cannot project " + 
source.schema() + " to " + target.schema());
+                }
+                targetStruct.put(fieldName, targetDefault);
+            }
+        }
+        return targetStruct;
+    }
+
+
+    private static void checkMaybeCompatible(Schema source, Schema target) {
+        if (source.type() != target.type() && !isPromotable(source.type(), 
target.type())) {
+            throw new SchemaProjectorException("Schema type mismatch. source 
type: " + source.type() + " and target type: " + target.type());
+        } else if (!Objects.equals(source.name(), target.name())) {
+            throw new SchemaProjectorException("Schema name mismatch. source 
name: " + source.name() + " and target name: " + target.name());
+        } else if (!Objects.equals(source.parameters(), target.parameters())) {
+            throw new SchemaProjectorException("Schema parameters not equal. 
source parameters: " + source.parameters() + " and target parameters: " + 
target.parameters());
+        }
+    }
+
+    private static Object projectArray(Schema source, Object record, Schema 
target) throws SchemaProjectorException {
+        List<?> array = (List<?>) record;
+        List<Object> retArray = new ArrayList<>();
+        for (Object entry : array) {
+            retArray.add(project(source.valueSchema(), entry, 
target.valueSchema()));
+        }
+        return retArray;
+    }
+
+    private static Object projectMap(Schema source, Object record, Schema 
target) throws SchemaProjectorException {
+        Map<?, ?> map = (Map<?, ?>) record;
+        Map<Object, Object> retMap = new HashMap<>();
+        for (Map.Entry<?, ?> entry : map.entrySet()) {
+            Object key = entry.getKey();
+            Object value = entry.getValue();
+            Object retKey = project(source.keySchema(), key, 
target.keySchema());
+            Object retValue = project(source.valueSchema(), value, 
target.valueSchema());
+            retMap.put(retKey, retValue);
+        }
+        return retMap;
+    }
+
+    private static Object projectPrimitive(Schema source, Object record, 
Schema target) throws SchemaProjectorException {
+        assert source.type().isPrimitive();
+        assert target.type().isPrimitive();
+        Object result;
+        if (isPromotable(source.type(), target.type())) {
+            Number numberRecord = (Number) record;
+            switch (target.type()) {
+                case INT8:
+                    result = numberRecord.byteValue();
+                    break;
+                case INT16:
+                    result = numberRecord.shortValue();
+                    break;
+                case INT32:
+                    result = numberRecord.intValue();
+                    break;
+                case INT64:
+                    result = numberRecord.longValue();
+                    break;
+                case FLOAT32:
+                    result = numberRecord.floatValue();
+                    break;
+                case FLOAT64:
+                    result = numberRecord.doubleValue();
+                    break;
+                default:
+                    throw new SchemaProjectorException("Not promotable type.");
+            }
+        } else {
+            result = record;
+        }
+        return result;
+    }
+
+    private static boolean isPromotable(Type sourceType, Type targetType) {
+        return promotable.contains(new 
AbstractMap.SimpleImmutableEntry<>(sourceType, targetType));
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
new file mode 100644
index 0000000..4ca37c3
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java
@@ -0,0 +1,265 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * <p>
+ *     A structured record containing a set of named fields with values, each 
field using an independent {@link Schema}.
+ *     Struct objects must specify a complete {@link Schema} up front, and 
only fields specified in the Schema may be set.
+ * </p>
+ * <p>
+ *     The Struct's {@link #put(String, Object)} method returns the Struct 
itself to provide a fluent API for constructing
+ *     complete objects:
+ *     <pre>
+ *         Schema schema = SchemaBuilder.struct().name("com.example.Person")
+ *             .field("name", Schema.STRING_SCHEMA).field("age", 
Schema.INT32_SCHEMA).build()
+ *         Struct struct = new Struct(schema).put("name", "Bobby 
McGee").put("age", 21)
+ *     </pre>
+ * </p>
+ */
+public class Struct {
+
+    private final Schema schema;
+    private final Object[] values;
+
+    /**
+     * Create a new Struct for this {@link Schema}
+     * @param schema the {@link Schema} for the Struct
+     */
+    public Struct(Schema schema) {
+        if (schema.type() != Schema.Type.STRUCT)
+            throw new DataException("Not a struct schema: " + schema);
+        this.schema = schema;
+        this.values = new Object[schema.fields().size()];
+    }
+
+    /**
+     * Get the schema for this Struct.
+     * @return the Struct's schema
+     */
+    public Schema schema() {
+        return schema;
+    }
+
+    /**
+     * Get the value of a field, returning the default value if no value has 
been set yet and a default value is specified
+     * in the field's schema. Because this handles fields of all types, the 
value is returned as an {@link Object} and
+     * must be cast to a more specific type.
+     * @param fieldName the field name to lookup
+     * @return the value for the field
+     */
+    public Object get(String fieldName) {
+        Field field = lookupField(fieldName);
+        return get(field);
+    }
+
+    /**
+     * Get the value of a field, returning the default value if no value has 
been set yet and a default value is specified
+     * in the field's schema. Because this handles fields of all types, the 
value is returned as an {@link Object} and
+     * must be cast to a more specific type.
+     * @param field the field to lookup
+     * @return the value for the field
+     */
+    public Object get(Field field) {
+        Object val = values[field.index()];
+        if (val == null && schema.defaultValue() != null) {
+            val = schema.defaultValue();
+        }
+        return val;
+    }
+
+    /**
+     * Get the underlying raw value for the field without accounting for 
default values.
+     * @param fieldName the field to get the value of
+     * @return the raw value
+     */
+    public Object getWithoutDefault(String fieldName) {
+        Field field = lookupField(fieldName);
+        return values[field.index()];
+    }
+
+    // Note that all getters have to have boxed return types since the fields 
might be optional
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
Byte.
+     */
+    public Byte getInt8(String fieldName) {
+        return (Byte) getCheckType(fieldName, Schema.Type.INT8);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
Short.
+     */
+    public Short getInt16(String fieldName) {
+        return (Short) getCheckType(fieldName, Schema.Type.INT16);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
Integer.
+     */
+    public Integer getInt32(String fieldName) {
+        return (Integer) getCheckType(fieldName, Schema.Type.INT32);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
Long.
+     */
+    public Long getInt64(String fieldName) {
+        return (Long) getCheckType(fieldName, Schema.Type.INT64);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
Float.
+     */
+    public Float getFloat32(String fieldName) {
+        return (Float) getCheckType(fieldName, Schema.Type.FLOAT32);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
Double.
+     */
+    public Double getFloat64(String fieldName) {
+        return (Double) getCheckType(fieldName, Schema.Type.FLOAT64);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
Boolean.
+     */
+    public Boolean getBoolean(String fieldName) {
+        return (Boolean) getCheckType(fieldName, Schema.Type.BOOLEAN);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
String.
+     */
+    public String getString(String fieldName) {
+        return (String) getCheckType(fieldName, Schema.Type.STRING);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
byte[].
+     */
+    public byte[] getBytes(String fieldName) {
+        Object bytes = getCheckType(fieldName, Schema.Type.BYTES);
+        if (bytes instanceof ByteBuffer)
+            return ((ByteBuffer) bytes).array();
+        return (byte[]) bytes;
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
List.
+     */
+    public <T> List<T> getArray(String fieldName) {
+        return (List<T>) getCheckType(fieldName, Schema.Type.ARRAY);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
Map.
+     */
+    public <K, V> Map<K, V> getMap(String fieldName) {
+        return (Map<K, V>) getCheckType(fieldName, Schema.Type.MAP);
+    }
+
+    /**
+     * Equivalent to calling {@link #get(String)} and casting the result to a 
Struct.
+     */
+    public Struct getStruct(String fieldName) {
+        return (Struct) getCheckType(fieldName, Schema.Type.STRUCT);
+    }
+
+    /**
+     * Set the value of a field. Validates the value, throwing a {@link 
DataException} if it does not match the field's
+     * {@link Schema}.
+     * @param fieldName the name of the field to set
+     * @param value the value of the field
+     * @return the Struct, to allow chaining of {@link #put(String, Object)} 
calls
+     */
+    public Struct put(String fieldName, Object value) {
+        Field field = lookupField(fieldName);
+        return put(field, value);
+    }
+
+    /**
+     * Set the value of a field. Validates the value, throwing a {@link 
DataException} if it does not match the field's
+     * {@link Schema}.
+     * @param field the field to set
+     * @param value the value of the field
+     * @return the Struct, to allow chaining of {@link #put(String, Object)} 
calls
+     */
+    public Struct put(Field field, Object value) {
+        ConnectSchema.validateValue(field.schema(), value);
+        values[field.index()] = value;
+        return this;
+    }
+
+
+    /**
+     * Validates that this struct has filled in all the necessary data with 
valid values. For required fields
+     * without defaults, this validates that a value has been set and has 
matching types/schemas. If any validation
+     * fails, throws a DataException.
+     */
+    public void validate() {
+        for (Field field : schema.fields()) {
+            Schema fieldSchema = field.schema();
+            Object value = values[field.index()];
+            if (value == null && (fieldSchema.isOptional() || 
fieldSchema.defaultValue() != null))
+                continue;
+            ConnectSchema.validateValue(fieldSchema, value);
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        Struct struct = (Struct) o;
+        return Objects.equals(schema, struct.schema) &&
+                Arrays.equals(values, struct.values);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(schema, Arrays.hashCode(values));
+    }
+
+    private Field lookupField(String fieldName) {
+        Field field = schema.field(fieldName);
+        if (field == null)
+            throw new DataException(fieldName + " is not a valid field name");
+        return field;
+    }
+
+    // Get the field's value, but also check that the field matches the 
specified type, throwing an exception if it doesn't.
+    // Used to implement the get*() methods that return typed data instead of 
Object
+    private Object getCheckType(String fieldName, Schema.Type type) {
+        Field field = lookupField(fieldName);
+        if (field.schema().type() != type)
+            throw new DataException("Field '" + fieldName + "' is not of type 
" + type);
+        return values[field.index()];
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Time.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Time.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Time.java
new file mode 100644
index 0000000..cecd891
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Time.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+
+import java.util.Calendar;
+import java.util.TimeZone;
+
+/**
+ * <p>
+ *     A time representing a specific point in a day, not tied to any specific 
date. The corresponding Java type is a
+ *     java.util.Date where only hours, minutes, seconds, and milliseconds can 
be non-zero. This effectively makes it a
+ *     point in time during the first day after the Unix epoch. The underlying 
representation is an integer
+ *     representing the number of milliseconds after midnight.
+ * </p>
+ */
+public class Time {
+    public static final String LOGICAL_NAME = 
"org.apache.kafka.connect.data.Time";
+
+    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+    /**
+     * Returns a SchemaBuilder for a Time. By returning a SchemaBuilder you 
can override additional schema settings such
+     * as required/optional, default value, and documentation.
+     * @return a SchemaBuilder
+     */
+    public static SchemaBuilder builder() {
+        return SchemaBuilder.int32()
+                .name(LOGICAL_NAME)
+                .version(1);
+    }
+
+    public static final Schema SCHEMA = builder().schema();
+
+    /**
+     * Convert a value from its logical format (Time) to it's encoded format.
+     * @param value the logical value
+     * @return the encoded value
+     */
+    public static int fromLogical(Schema schema, java.util.Date value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Time object but 
the schema does not match.");
+        Calendar calendar = Calendar.getInstance(UTC);
+        calendar.setTime(value);
+        long unixMillis = calendar.getTimeInMillis();
+        if (unixMillis < 0 || unixMillis > MILLIS_PER_DAY) {
+            throw new DataException("Kafka Connect Time type should not have 
any date fields set to non-zero values.");
+        }
+        return (int) unixMillis;
+    }
+
+    public static java.util.Date toLogical(Schema schema, int value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Date object but 
the schema does not match.");
+        if (value  < 0 || value > MILLIS_PER_DAY)
+            throw new DataException("Time values must use number of 
milliseconds greater than 0 and less than 86400000");
+        return new java.util.Date(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java 
b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
new file mode 100644
index 0000000..c447f6d
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.errors.DataException;
+
+import java.util.TimeZone;
+
+/**
+ * <p>
+ *     A timestamp representing an absolute time, without timezone 
information. The corresponding Java type is a
+ *     java.util.Date. The underlying representation is a long representing 
the number of milliseconds since Unix epoch.
+ * </p>
+ */
+public class Timestamp {
+    public static final String LOGICAL_NAME = 
"org.apache.kafka.connect.data.Timestamp";
+
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+
+    /**
+     * Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder 
you can override additional schema settings such
+     * as required/optional, default value, and documentation.
+     * @return a SchemaBuilder
+     */
+    public static SchemaBuilder builder() {
+        return SchemaBuilder.int64()
+                .name(LOGICAL_NAME)
+                .version(1);
+    }
+
+    public static final Schema SCHEMA = builder().schema();
+
+    /**
+     * Convert a value from its logical format (Date) to it's encoded format.
+     * @param value the logical value
+     * @return the encoded value
+     */
+    public static long fromLogical(Schema schema, java.util.Date value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Timestamp object 
but the schema does not match.");
+        return value.getTime();
+    }
+
+    public static java.util.Date toLogical(Schema schema, long value) {
+        if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
+            throw new DataException("Requested conversion of Timestamp object 
but the schema does not match.");
+        return new java.util.Date(value);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/ConnectException.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/errors/ConnectException.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/ConnectException.java
new file mode 100644
index 0000000..1202be3
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/ConnectException.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+/**
+ * ConnectException is the top-level exception type generated by Kafka Connect 
and connector implementations.
+ */
+@InterfaceStability.Unstable
+public class ConnectException extends KafkaException {
+
+    public ConnectException(String s) {
+        super(s);
+    }
+
+    public ConnectException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public ConnectException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/DataException.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/errors/DataException.java 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/DataException.java
new file mode 100644
index 0000000..75426a3
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/DataException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+/**
+ * Base class for all Kafka Connect data API exceptions.
+ */
+public class DataException extends ConnectException {
+    public DataException(String s) {
+        super(s);
+    }
+
+    public DataException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public DataException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/IllegalWorkerStateException.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/errors/IllegalWorkerStateException.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/IllegalWorkerStateException.java
new file mode 100644
index 0000000..8310059
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/IllegalWorkerStateException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+/**
+ * Indicates that a method has been invoked illegally or at an invalid time by 
a connector or task.
+ */
+public class IllegalWorkerStateException extends ConnectException {
+    public IllegalWorkerStateException(String s) {
+        super(s);
+    }
+
+    public IllegalWorkerStateException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public IllegalWorkerStateException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaBuilderException.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaBuilderException.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaBuilderException.java
new file mode 100644
index 0000000..6f0e551
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaBuilderException.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.errors;
+
+public class SchemaBuilderException extends DataException {
+    public SchemaBuilderException(String s) {
+        super(s);
+    }
+
+    public SchemaBuilderException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public SchemaBuilderException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaProjectorException.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaProjectorException.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaProjectorException.java
new file mode 100644
index 0000000..3e2a763
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaProjectorException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ **/
+package org.apache.kafka.connect.errors;
+
+public class SchemaProjectorException extends DataException {
+    public SchemaProjectorException(String s) {
+        super(s);
+    }
+
+    public SchemaProjectorException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public SchemaProjectorException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java 
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
new file mode 100644
index 0000000..fbe6975
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.sink;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.connector.Connector;
+
+/**
+ * SinkConnectors implement the Connector interface to send Kafka data to 
another system.
+ */
+@InterfaceStability.Unstable
+public abstract class SinkConnector extends Connector {
+
+    /**
+     * <p>
+     * Configuration key for the list of input topics for this connector.
+     * </p>
+     * <p>
+     * Usually this setting is only relevant to the Kafka Connect framework, 
but is provided here for
+     * the convenience of Connector developers if they also need to know the 
set of topics.
+     * </p>
+     */
+    public static final String TOPICS_CONFIG = "topics";
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java 
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
new file mode 100644
index 0000000..0bd0f6f
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.sink;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and 
includes the kafkaOffset of
+ * the record in the Kafka topic-partition in addition to the standard fields. 
This information
+ * should be used by the SinkTask to coordinate kafkaOffset commits.
+ */
+@InterfaceStability.Unstable
+public class SinkRecord extends ConnectRecord {
+    private final long kafkaOffset;
+
+    public SinkRecord(String topic, int partition, Schema keySchema, Object 
key, Schema valueSchema, Object value, long kafkaOffset) {
+        super(topic, partition, keySchema, key, valueSchema, value);
+        this.kafkaOffset = kafkaOffset;
+    }
+
+    public long kafkaOffset() {
+        return kafkaOffset;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        if (!super.equals(o))
+            return false;
+
+        SinkRecord that = (SinkRecord) o;
+
+        if (kafkaOffset != that.kafkaOffset)
+            return false;
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = super.hashCode();
+        result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32));
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "SinkRecord{" +
+                "kafkaOffset=" + kafkaOffset +
+                "} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java 
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
new file mode 100644
index 0000000..7e793c8
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.sink;
+
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.connector.Task;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * SinkTask is a Task takes records loaded from Kafka and sends them to 
another system. In
+ * addition to the basic {@link #put} interface, SinkTasks must also implement 
{@link #flush}
+ * to support offset commits.
+ */
+@InterfaceStability.Unstable
+public abstract class SinkTask implements Task {
+
+    /**
+     * <p>
+     * The configuration key that provides the list of topics that are inputs 
for this
+     * SinkTask.
+     * </p>
+     */
+    public static final String TOPICS_CONFIG = "topics";
+
+    protected SinkTaskContext context;
+
+    public void initialize(SinkTaskContext context) {
+        this.context = context;
+    }
+
+    /**
+     * Start the Task. This should handle any configuration parsing and 
one-time setup of the task.
+     * @param props initial configuration
+     */
+    @Override
+    public abstract void start(Map<String, String> props);
+
+    /**
+     * Put the records in the sink. Usually this should send the records to 
the sink asynchronously
+     * and immediately return.
+     *
+     * If this operation fails, the SinkTask may throw a {@link 
org.apache.kafka.connect.errors.RetriableException} to
+     * indicate that the framework should attempt to retry the same call 
again. Other exceptions will cause the task to
+     * be stopped immediately. {@link SinkTaskContext#timeout(long)} can be 
used to set the maximum time before the
+     * batch will be retried.
+     *
+     * @param records the set of records to send
+     */
+    public abstract void put(Collection<SinkRecord> records);
+
+    /**
+     * Flush all records that have been {@link #put} for the specified 
topic-partitions. The
+     * offsets are provided for convenience, but could also be determined by 
tracking all offsets
+     * included in the SinkRecords passed to {@link #put}.
+     *
+     * @param offsets mapping of TopicPartition to committed offset
+     */
+    public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets);
+
+    /**
+     * The SinkTask use this method to create writers for newly assigned 
partitions in case of partition
+     * re-assignment. In partition re-assignment, some new partitions may be 
assigned to the SinkTask.
+     * The SinkTask needs to create writers and perform necessary recovery for 
the newly assigned partitions.
+     * This method will be called after partition re-assignment completes and 
before the SinkTask starts
+     * fetching data.
+     * @param partitions The list of partitions that are now assigned to the 
task (may include
+     *                   partitions previously assigned to the task)
+     */
+    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+    }
+
+    /**
+     * The SinkTask use this method to close writers and commit offsets for 
partitions that are
+     * longer assigned to the SinkTask. This method will be called before a 
rebalance operation starts
+     * and after the SinkTask stops fetching data.
+     * @param partitions The list of partitions that were assigned to the 
consumer on the last
+     *                   rebalance
+     */
+    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+    }
+
+    /**
+     * Perform any cleanup to stop this task. In SinkTasks, this method is 
invoked only once outstanding calls to other
+     * methods have completed (e.g., {@link #put(Collection)} has returned) 
and a final {@link #flush(Map)} and offset
+     * commit has completed. Implementations of this method should only need 
to perform final cleanup operations, such
+     * as closing network connections to the sink system.
+     */
+    public abstract void stop();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java 
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
new file mode 100644
index 0000000..2202cae
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.sink;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Context passed to SinkTasks, allowing them to access utilities in the Kafka 
Connect runtime.
+ */
+@InterfaceStability.Unstable
+public interface SinkTaskContext {
+    /**
+     * Reset the consumer offsets for the given topic partitions. SinkTasks 
should use this if they manage offsets
+     * in the sink data store rather than using Kafka consumer offsets. For 
example, an HDFS connector might record
+     * offsets in HDFS to provide exactly once delivery. When the SinkTask is 
started or a rebalance occurs, the task
+     * would reload offsets from HDFS and use this method to reset the 
consumer to those offsets.
+     *
+     * SinkTasks that do not manage their own offsets do not need to use this 
method.
+     *
+     * @param offsets map of offsets for topic partitions
+     */
+    void offset(Map<TopicPartition, Long> offsets);
+
+    /**
+     * Reset the consumer offsets for the given topic partition. SinkTasks 
should use if they manage offsets
+     * in the sink data store rather than using Kafka consumer offsets. For 
example, an HDFS connector might record
+     * offsets in HDFS to provide exactly once delivery. When the topic 
partition is recovered the task
+     * would reload offsets from HDFS and use this method to reset the 
consumer to the offset.
+     *
+     * SinkTasks that do not manage their own offsets do not need to use this 
method.
+     *
+     * @param tp the topic partition to reset offset.
+     * @param offset the offset to reset to.
+     */
+    void offset(TopicPartition tp, long offset);
+
+    /**
+     * Set the timeout in milliseconds. SinkTasks should use this to indicate 
that they need to retry certain
+     * operations after the timeout. SinkTasks may have certain operations on 
external systems that may need
+     * to retry in case of failures. For example, append a record to an HDFS 
file may fail due to temporary network
+     * issues. SinkTasks use this method to set how long to wait before 
retrying.
+     * @param timeoutMs the backoff timeout in milliseconds.
+     */
+    void timeout(long timeoutMs);
+
+    /**
+     * Get the current set of assigned TopicPartitions for this task.
+     * @return the set of currently assigned TopicPartitions
+     */
+    Set<TopicPartition> assignment();
+
+    /**
+     * Pause consumption of messages from the specified TopicPartitions.
+     * @param partitions the partitions which should be paused
+     */
+    void pause(TopicPartition... partitions);
+
+    /**
+     * Resume consumption of messages from previously paused TopicPartitions.
+     * @param partitions the partitions to resume
+     */
+    void resume(TopicPartition... partitions);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
----------------------------------------------------------------------
diff --git 
a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
new file mode 100644
index 0000000..2ba5139
--- /dev/null
+++ 
b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.source;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.connect.connector.Connector;
+
+/**
+ * SourceConnectors implement the connector interface to pull data from 
another system and send
+ * it to Kafka.
+ */
+@InterfaceStability.Unstable
+public abstract class SourceConnector extends Connector {
+
+}

Reply via email to