http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Date.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Date.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Date.java new file mode 100644 index 0000000..842da66 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Date.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; + +import java.util.Calendar; +import java.util.TimeZone; + +/** + * <p> + * A date representing a calendar day with no time of day or timezone. The corresponding Java type is a java.util.Date + * with hours, minutes, seconds, milliseconds set to 0. The underlying representation is an integer representing the + * number of standardized days (based on a number of milliseconds with 24 hours/day, 60 minutes/hour, 60 seconds/minute, + * 1000 milliseconds/second with n) since Unix epoch. + * </p> + */ +public class Date { + public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Date"; + + private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; + + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + + /** + * Returns a SchemaBuilder for a Date. By returning a SchemaBuilder you can override additional schema settings such + * as required/optional, default value, and documentation. + * @return a SchemaBuilder + */ + public static SchemaBuilder builder() { + return SchemaBuilder.int32() + .name(LOGICAL_NAME) + .version(1); + } + + public static final Schema SCHEMA = builder().schema(); + + /** + * Convert a value from its logical format (Date) to it's encoded format. + * @param value the logical value + * @return the encoded value + */ + public static int fromLogical(Schema schema, java.util.Date value) { + if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) + throw new DataException("Requested conversion of Date object but the schema does not match."); + Calendar calendar = Calendar.getInstance(UTC); + calendar.setTime(value); + if (calendar.get(Calendar.HOUR_OF_DAY) != 0 || calendar.get(Calendar.MINUTE) != 0 || + calendar.get(Calendar.SECOND) != 0 || calendar.get(Calendar.MILLISECOND) != 0) { + throw new DataException("Kafka Connect Date type should not have any time fields set to non-zero values."); + } + long unixMillis = calendar.getTimeInMillis(); + return (int) (unixMillis / MILLIS_PER_DAY); + } + + public static java.util.Date toLogical(Schema schema, int value) { + if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) + throw new DataException("Requested conversion of Date object but the schema does not match."); + return new java.util.Date(value * MILLIS_PER_DAY); + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java new file mode 100644 index 0000000..e15f698 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Decimal.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; + +import java.math.BigDecimal; +import java.math.BigInteger; + +/** + * <p> + * An arbitrary-precision signed decimal number. The value is unscaled * 10 ^ -scale where: + * <ul> + * <li>unscaled is an integer </li> + * <li>scale is an integer representing how many digits the decimal point should be shifted on the unscaled value</li> + * </ul> + * </p> + * <p> + * Decimal does not provide a fixed schema because it is parameterized by the scale, which is fixed on the schema + * rather than being part of the value. + * </p> + * <p> + * The underlying representation of this type is bytes containing a two's complement integer + * </p> + */ +public class Decimal { + public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Decimal"; + public static final String SCALE_FIELD = "scale"; + + /** + * Returns a SchemaBuilder for a Decimal with the given scale factor. By returning a SchemaBuilder you can override + * additional schema settings such as required/optional, default value, and documentation. + * @param scale the scale factor to apply to unscaled values + * @return a SchemaBuilder + */ + public static SchemaBuilder builder(int scale) { + return SchemaBuilder.bytes() + .name(LOGICAL_NAME) + .parameter(SCALE_FIELD, ((Integer) scale).toString()) + .version(1); + } + + public static Schema schema(int scale) { + return builder(scale).build(); + } + + /** + * Convert a value from its logical format (BigDecimal) to it's encoded format. + * @param value the logical value + * @return the encoded value + */ + public static byte[] fromLogical(Schema schema, BigDecimal value) { + if (value.scale() != scale(schema)) + throw new DataException("BigDecimal has mismatching scale value for given Decimal schema"); + return value.unscaledValue().toByteArray(); + } + + public static BigDecimal toLogical(Schema schema, byte[] value) { + return new BigDecimal(new BigInteger(value), scale(schema)); + } + + private static int scale(Schema schema) { + String scaleString = schema.parameters().get(SCALE_FIELD); + if (scaleString == null) + throw new DataException("Invalid Decimal schema: scale parameter not found."); + try { + return Integer.parseInt(scaleString); + } catch (NumberFormatException e) { + throw new DataException("Invalid scale parameter found in Decimal schema: ", e); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Field.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Field.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Field.java new file mode 100644 index 0000000..7dd32ce --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Field.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import java.util.Objects; + +/** + * <p> + * A field in a {@link Struct}, consisting of a field name, index, and {@link Schema} for the field value. + * </p> + */ +public class Field { + private final String name; + private final int index; + private final Schema schema; + + public Field(String name, int index, Schema schema) { + this.name = name; + this.index = index; + this.schema = schema; + } + + /** + * Get the name of this field. + * @return the name of this field + */ + public String name() { + return name; + } + + + /** + * Get the index of this field within the struct. + * @return the index of this field + */ + public int index() { + return index; + } + + /** + * Get the schema of this field + * @return the schema of values of this field + */ + public Schema schema() { + return schema; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Field field = (Field) o; + return Objects.equals(index, field.index) && + Objects.equals(name, field.name) && + Objects.equals(schema, field.schema); + } + + @Override + public int hashCode() { + return Objects.hash(name, index, schema); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java new file mode 100644 index 0000000..3c0e40c --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Schema.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import java.util.List; +import java.util.Map; + +/** + * <p> + * Definition of an abstract data type. Data types can be primitive types (integer types, floating point types, + * boolean, strings, and bytes) or complex types (typed arrays, maps with one key schema and value schema, + * and structs that have a fixed set of field names each with an associated value schema). Any type can be specified + * as optional, allowing it to be omitted (resulting in null values when it is missing) and can specify a default + * value. + * </p> + * <p> + * All schemas may have some associated metadata: a name, version, and documentation. These are all considered part + * of the schema itself and included when comparing schemas. Besides adding important metadata, these fields enable + * the specification of logical types that specify additional constraints and semantics (e.g. UNIX timestamps are + * just an int64, but the user needs the know about the additional semantics to interpret it properly). + * </p> + * <p> + * Schemas can be created directly, but in most cases using {@link SchemaBuilder} will be simpler. + * </p> + */ +public interface Schema { + /** + * The type of a schema. These only include the core types; logical types must be determined by checking the schema name. + */ + enum Type { + INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, STRUCT; + + private String name; + + Type() { + this.name = this.name().toLowerCase(); + } + + public String getName() { + return name; + } + + public boolean isPrimitive() { + switch (this) { + case INT8: + case INT16: + case INT32: + case INT64: + case FLOAT32: + case FLOAT64: + case BOOLEAN: + case STRING: + case BYTES: + return true; + } + return false; + } + } + + + Schema INT8_SCHEMA = SchemaBuilder.int8().build(); + Schema INT16_SCHEMA = SchemaBuilder.int16().build(); + Schema INT32_SCHEMA = SchemaBuilder.int32().build(); + Schema INT64_SCHEMA = SchemaBuilder.int64().build(); + Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build(); + Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build(); + Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build(); + Schema STRING_SCHEMA = SchemaBuilder.string().build(); + Schema BYTES_SCHEMA = SchemaBuilder.bytes().build(); + + Schema OPTIONAL_INT8_SCHEMA = SchemaBuilder.int8().optional().build(); + Schema OPTIONAL_INT16_SCHEMA = SchemaBuilder.int16().optional().build(); + Schema OPTIONAL_INT32_SCHEMA = SchemaBuilder.int32().optional().build(); + Schema OPTIONAL_INT64_SCHEMA = SchemaBuilder.int64().optional().build(); + Schema OPTIONAL_FLOAT32_SCHEMA = SchemaBuilder.float32().optional().build(); + Schema OPTIONAL_FLOAT64_SCHEMA = SchemaBuilder.float64().optional().build(); + Schema OPTIONAL_BOOLEAN_SCHEMA = SchemaBuilder.bool().optional().build(); + Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build(); + Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build(); + + /** + * @return the type of this schema + */ + Type type(); + + /** + * @return true if this field is optional, false otherwise + */ + boolean isOptional(); + + /** + * @return the default value for this schema + */ + Object defaultValue(); + + /** + * @return the name of this schema + */ + String name(); + + /** + * Get the optional version of the schema. If a version is included, newer versions *must* be larger than older ones. + * @return the version of this schema + */ + Integer version(); + + /** + * @return the documentation for this schema + */ + String doc(); + + /** + * Get a map of schema parameters. + * @return Map containing parameters for this schema, or null if there are no parameters + */ + Map<String, String> parameters(); + + /** + * Get the key schema for this map schema. Throws a DataException if this schema is not a map. + * @return the key schema + */ + Schema keySchema(); + + /** + * Get the value schema for this map or array schema. Throws a DataException if this schema is not a map or array. + * @return the value schema + */ + Schema valueSchema(); + + /** + * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct. + * @return the list of fields for this Schema + */ + List<Field> fields(); + + /** + * Get a field for this Schema by name. Throws a DataException if this schema is not a struct. + * @param fieldName the name of the field to look up + * @return the Field object for the specified field, or null if there is no field with the given name + */ + Field field(String fieldName); + + /** + * Return a concrete instance of the {@link Schema} + * @return the {@link Schema} + */ + Schema schema(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaAndValue.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaAndValue.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaAndValue.java new file mode 100644 index 0000000..71198f0 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaAndValue.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import java.util.Objects; + +public class SchemaAndValue { + private final Schema schema; + private final Object value; + + public static final SchemaAndValue NULL = new SchemaAndValue(null, null); + + public SchemaAndValue(Schema schema, Object value) { + this.value = value; + this.schema = schema; + } + + public Schema schema() { + return schema; + } + + public Object value() { + return value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SchemaAndValue that = (SchemaAndValue) o; + return Objects.equals(schema, that.schema) && + Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(schema, value); + } + + @Override + public String toString() { + return "SchemaAndValue{" + + "schema=" + schema + + ", value=" + value + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java new file mode 100644 index 0000000..36cbf91 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaBuilder.java @@ -0,0 +1,412 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.errors.SchemaBuilderException; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * <p> + * SchemaBuilder provides a fluent API for constructing {@link Schema} objects. It allows you to set each of the + * properties for the schema and each call returns the SchemaBuilder so the calls can be chained. When nested types + * are required, use one of the predefined schemas from {@link Schema} or use a second SchemaBuilder inline. + * </p> + * <p> + * Here is an example of building a struct schema: + * <pre> + * Schema dateSchema = SchemaBuilder.struct() + * .name("com.example.CalendarDate").version(2).doc("A calendar date including month, day, and year.") + * .field("month", Schema.STRING_SCHEMA) + * .field("day", Schema.INT8_SCHEMA) + * .field("year", Schema.INT16_SCHEMA) + * .build(); + * </pre> + * </p> + * <p> + * Here is an example of using a second SchemaBuilder to construct complex, nested types: + * <pre> + * Schema userListSchema = SchemaBuilder.array( + * SchemaBuilder.struct().name("com.example.User").field("username", Schema.STRING_SCHEMA).field("id", Schema.INT64_SCHEMA).build() + * ).build(); + * </pre> + * </p> + */ +public class SchemaBuilder implements Schema { + private static final String TYPE_FIELD = "type"; + private static final String OPTIONAL_FIELD = "optional"; + private static final String DEFAULT_FIELD = "default"; + private static final String NAME_FIELD = "name"; + private static final String VERSION_FIELD = "version"; + private static final String DOC_FIELD = "doc"; + + + private final Type type; + private Boolean optional = null; + private Object defaultValue = null; + + private List<Field> fields = null; + private Schema keySchema = null; + private Schema valueSchema = null; + + private String name; + private Integer version; + // Optional human readable documentation describing this schema. + private String doc; + // Additional parameters for logical types. + private Map<String, String> parameters; + + private SchemaBuilder(Type type) { + this.type = type; + } + + // Common/metadata fields + + @Override + public boolean isOptional() { + return optional == null ? false : optional; + } + + /** + * Set this schema as optional. + * @return the SchemaBuilder + */ + public SchemaBuilder optional() { + checkNull(OPTIONAL_FIELD, optional); + optional = true; + return this; + } + + /** + * Set this schema as required. This is the default, but this method can be used to make this choice explicit. + * @return the SchemaBuilder + */ + public SchemaBuilder required() { + checkNull(OPTIONAL_FIELD, optional); + optional = false; + return this; + } + + @Override + public Object defaultValue() { + return defaultValue; + } + + /** + * Set the default value for this schema. The value is validated against the schema type, throwing a + * {@link SchemaBuilderException} if it does not match. + * @param value the default value + * @return the SchemaBuilder + */ + public SchemaBuilder defaultValue(Object value) { + checkNull(DEFAULT_FIELD, defaultValue); + checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD); + try { + ConnectSchema.validateValue(this, value); + } catch (DataException e) { + throw new SchemaBuilderException("Invalid default value", e); + } + defaultValue = value; + return this; + } + + @Override + public String name() { + return name; + } + + /** + * Set the name of this schema. + * @param name the schema name + * @return the SchemaBuilder + */ + public SchemaBuilder name(String name) { + checkNull(NAME_FIELD, this.name); + this.name = name; + return this; + } + + @Override + public Integer version() { + return version; + } + + /** + * Set the version of this schema. Schema versions are integers which, if provided, must indicate which schema is + * newer and which is older by their ordering. + * @param version the schema version + * @return the SchemaBuilder + */ + public SchemaBuilder version(Integer version) { + checkNull(VERSION_FIELD, this.version); + this.version = version; + return this; + } + + @Override + public String doc() { + return doc; + } + + /** + * Set the documentation for this schema. + * @param doc the documentation + * @return the SchemaBuilder + */ + public SchemaBuilder doc(String doc) { + checkNull(DOC_FIELD, this.doc); + this.doc = doc; + return this; + } + + @Override + public Map<String, String> parameters() { + return Collections.unmodifiableMap(parameters); + } + + /** + * Set a schema parameter. + * @param propertyName name of the schema property to define + * @param propertyValue value of the schema property to define, as a String + * @return the SchemaBuilder + */ + public SchemaBuilder parameter(String propertyName, String propertyValue) { + // Preserve order of insertion with a LinkedHashMap. This isn't strictly necessary, but is nice if logical types + // can print their properties in a consistent order. + if (parameters == null) + parameters = new LinkedHashMap<>(); + parameters.put(propertyName, propertyValue); + return this; + } + + /** + * Set schema parameters. This operation is additive; it does not remove existing parameters that do not appear in + * the set of properties pass to this method. + * @param props Map of properties to set + * @return the SchemaBuilder + */ + public SchemaBuilder parameters(Map<String, String> props) { + // Avoid creating an empty set of properties so we never have an empty map + if (props.isEmpty()) + return this; + if (parameters == null) + parameters = new LinkedHashMap<>(); + parameters.putAll(props); + return this; + } + + @Override + public Type type() { + return type; + } + + /** + * Create a SchemaBuilder for the specified type. + * + * Usually it will be simpler to use one of the variants like {@link #string()} or {@link #struct()}, but this form + * can be useful when generating schemas dynamically. + * + * @param type the schema type + * @return a new SchemaBuilder + */ + public static SchemaBuilder type(Type type) { + return new SchemaBuilder(type); + } + + // Primitive types + + /** + * @return a new {@link Type#INT8} SchemaBuilder + */ + public static SchemaBuilder int8() { + return new SchemaBuilder(Type.INT8); + } + + /** + * @return a new {@link Type#INT16} SchemaBuilder + */ + public static SchemaBuilder int16() { + return new SchemaBuilder(Type.INT16); + } + + /** + * @return a new {@link Type#INT32} SchemaBuilder + */ + public static SchemaBuilder int32() { + return new SchemaBuilder(Type.INT32); + } + + /** + * @return a new {@link Type#INT64} SchemaBuilder + */ + public static SchemaBuilder int64() { + return new SchemaBuilder(Type.INT64); + } + + /** + * @return a new {@link Type#FLOAT32} SchemaBuilder + */ + public static SchemaBuilder float32() { + return new SchemaBuilder(Type.FLOAT32); + } + + /** + * @return a new {@link Type#FLOAT64} SchemaBuilder + */ + public static SchemaBuilder float64() { + return new SchemaBuilder(Type.FLOAT64); + } + + /** + * @return a new {@link Type#BOOLEAN} SchemaBuilder + */ + public static SchemaBuilder bool() { + return new SchemaBuilder(Type.BOOLEAN); + } + + /** + * @return a new {@link Type#STRING} SchemaBuilder + */ + public static SchemaBuilder string() { + return new SchemaBuilder(Type.STRING); + } + + /** + * @return a new {@link Type#BYTES} SchemaBuilder + */ + public static SchemaBuilder bytes() { + return new SchemaBuilder(Type.BYTES); + } + + + // Structs + + /** + * @return a new {@link Type#STRUCT} SchemaBuilder + */ + public static SchemaBuilder struct() { + return new SchemaBuilder(Type.STRUCT); + } + + /** + * Add a field to this struct schema. Throws a SchemaBuilderException if this is not a struct schema. + * @param fieldName the name of the field to add + * @param fieldSchema the Schema for the field's value + * @return the SchemaBuilder + */ + public SchemaBuilder field(String fieldName, Schema fieldSchema) { + if (type != Type.STRUCT) + throw new SchemaBuilderException("Cannot create fields on type " + type); + if (fields == null) + fields = new ArrayList<>(); + int fieldIndex = fields.size(); + fields.add(new Field(fieldName, fieldIndex, fieldSchema)); + return this; + } + + /** + * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct. + * @return the list of fields for this Schema + */ + public List<Field> fields() { + if (type != Type.STRUCT) + throw new DataException("Cannot list fields on non-struct type"); + return fields; + } + + public Field field(String fieldName) { + if (type != Type.STRUCT) + throw new DataException("Cannot look up fields on non-struct type"); + for (Field field : fields) + if (field.name() == fieldName) + return field; + return null; + } + + + + // Maps & Arrays + + /** + * @param valueSchema the schema for elements of the array + * @return a new {@link Type#ARRAY} SchemaBuilder + */ + public static SchemaBuilder array(Schema valueSchema) { + SchemaBuilder builder = new SchemaBuilder(Type.ARRAY); + builder.valueSchema = valueSchema; + return builder; + } + + /** + * @param keySchema the schema for keys in the map + * @param valueSchema the schema for values in the map + * @return a new {@link Type#MAP} SchemaBuilder + */ + public static SchemaBuilder map(Schema keySchema, Schema valueSchema) { + SchemaBuilder builder = new SchemaBuilder(Type.MAP); + builder.keySchema = keySchema; + builder.valueSchema = valueSchema; + return builder; + } + + @Override + public Schema keySchema() { + return keySchema; + } + + @Override + public Schema valueSchema() { + return valueSchema; + } + + + /** + * Build the Schema using the current settings + * @return the {@link Schema} + */ + public Schema build() { + return new ConnectSchema(type, isOptional(), defaultValue, name, version, doc, + parameters == null ? null : Collections.unmodifiableMap(parameters), + fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema); + } + + /** + * Return a concrete instance of the {@link Schema} specified by this builder + * @return the {@link Schema} + */ + @Override + public Schema schema() { + return build(); + } + + + private static void checkNull(String fieldName, Object val) { + if (val != null) + throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " has already been set."); + } + + private static void checkNotNull(String fieldName, Object val, String fieldToSet) { + if (val == null) + throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java new file mode 100644 index 0000000..ad0caf8 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.data.Schema.Type; +import org.apache.kafka.connect.errors.SchemaProjectorException; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** + * <p> + * SchemaProjector is utility to project a value between compatible schemas and throw exceptions + * when non compatible schemas are provided. + * </p> + */ + +public class SchemaProjector { + + private static Set<AbstractMap.SimpleImmutableEntry<Type, Type>> promotable = new HashSet<>(); + + static { + Type[] promotableTypes = {Type.INT8, Type.INT16, Type.INT32, Type.INT64, Type.FLOAT32, Type.FLOAT64}; + for (int i = 0; i < promotableTypes.length; ++i) { + for (int j = i; j < promotableTypes.length; ++j) { + promotable.add(new AbstractMap.SimpleImmutableEntry<>(promotableTypes[i], promotableTypes[j])); + } + } + } + + /** + * This method project a value between compatible schemas and throw exceptions when non compatible schemas are provided + * @param source the schema used to construct the record + * @param record the value to project from source schema to target schema + * @param target the schema to project the record to + * @return the projected value with target schema + * @throws SchemaProjectorException + */ + public static Object project(Schema source, Object record, Schema target) throws SchemaProjectorException { + checkMaybeCompatible(source, target); + if (source.isOptional() && !target.isOptional()) { + if (target.defaultValue() != null) { + if (record != null) { + return projectRequiredSchema(source, record, target); + } else { + return target.defaultValue(); + } + } else { + throw new SchemaProjectorException("Writer schema is optional, however, target schema does not provide a default value."); + } + } else { + if (record != null) { + return projectRequiredSchema(source, record, target); + } else { + return null; + } + } + } + + private static Object projectRequiredSchema(Schema source, Object record, Schema target) throws SchemaProjectorException { + switch (target.type()) { + case INT8: + case INT16: + case INT32: + case INT64: + case FLOAT32: + case FLOAT64: + case BOOLEAN: + case BYTES: + case STRING: + return projectPrimitive(source, record, target); + case STRUCT: + return projectStruct(source, (Struct) record, target); + case ARRAY: + return projectArray(source, record, target); + case MAP: + return projectMap(source, record, target); + } + return null; + } + + private static Object projectStruct(Schema source, Struct sourceStruct, Schema target) throws SchemaProjectorException { + Struct targetStruct = new Struct(target); + for (Field targetField : target.fields()) { + String fieldName = targetField.name(); + Field sourceField = source.field(fieldName); + if (sourceField != null) { + Object sourceFieldValue = sourceStruct.get(fieldName); + try { + Object targetFieldValue = project(sourceField.schema(), sourceFieldValue, targetField.schema()); + targetStruct.put(fieldName, targetFieldValue); + } catch (SchemaProjectorException e) { + throw new SchemaProjectorException("Error projecting " + sourceField.name(), e); + } + } else { + Object targetDefault; + if (targetField.schema().defaultValue() != null) { + targetDefault = targetField.schema().defaultValue(); + } else { + throw new SchemaProjectorException("Cannot project " + source.schema() + " to " + target.schema()); + } + targetStruct.put(fieldName, targetDefault); + } + } + return targetStruct; + } + + + private static void checkMaybeCompatible(Schema source, Schema target) { + if (source.type() != target.type() && !isPromotable(source.type(), target.type())) { + throw new SchemaProjectorException("Schema type mismatch. source type: " + source.type() + " and target type: " + target.type()); + } else if (!Objects.equals(source.name(), target.name())) { + throw new SchemaProjectorException("Schema name mismatch. source name: " + source.name() + " and target name: " + target.name()); + } else if (!Objects.equals(source.parameters(), target.parameters())) { + throw new SchemaProjectorException("Schema parameters not equal. source parameters: " + source.parameters() + " and target parameters: " + target.parameters()); + } + } + + private static Object projectArray(Schema source, Object record, Schema target) throws SchemaProjectorException { + List<?> array = (List<?>) record; + List<Object> retArray = new ArrayList<>(); + for (Object entry : array) { + retArray.add(project(source.valueSchema(), entry, target.valueSchema())); + } + return retArray; + } + + private static Object projectMap(Schema source, Object record, Schema target) throws SchemaProjectorException { + Map<?, ?> map = (Map<?, ?>) record; + Map<Object, Object> retMap = new HashMap<>(); + for (Map.Entry<?, ?> entry : map.entrySet()) { + Object key = entry.getKey(); + Object value = entry.getValue(); + Object retKey = project(source.keySchema(), key, target.keySchema()); + Object retValue = project(source.valueSchema(), value, target.valueSchema()); + retMap.put(retKey, retValue); + } + return retMap; + } + + private static Object projectPrimitive(Schema source, Object record, Schema target) throws SchemaProjectorException { + assert source.type().isPrimitive(); + assert target.type().isPrimitive(); + Object result; + if (isPromotable(source.type(), target.type())) { + Number numberRecord = (Number) record; + switch (target.type()) { + case INT8: + result = numberRecord.byteValue(); + break; + case INT16: + result = numberRecord.shortValue(); + break; + case INT32: + result = numberRecord.intValue(); + break; + case INT64: + result = numberRecord.longValue(); + break; + case FLOAT32: + result = numberRecord.floatValue(); + break; + case FLOAT64: + result = numberRecord.doubleValue(); + break; + default: + throw new SchemaProjectorException("Not promotable type."); + } + } else { + result = record; + } + return result; + } + + private static boolean isPromotable(Type sourceType, Type targetType) { + return promotable.contains(new AbstractMap.SimpleImmutableEntry<>(sourceType, targetType)); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java new file mode 100644 index 0000000..4ca37c3 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * <p> + * A structured record containing a set of named fields with values, each field using an independent {@link Schema}. + * Struct objects must specify a complete {@link Schema} up front, and only fields specified in the Schema may be set. + * </p> + * <p> + * The Struct's {@link #put(String, Object)} method returns the Struct itself to provide a fluent API for constructing + * complete objects: + * <pre> + * Schema schema = SchemaBuilder.struct().name("com.example.Person") + * .field("name", Schema.STRING_SCHEMA).field("age", Schema.INT32_SCHEMA).build() + * Struct struct = new Struct(schema).put("name", "Bobby McGee").put("age", 21) + * </pre> + * </p> + */ +public class Struct { + + private final Schema schema; + private final Object[] values; + + /** + * Create a new Struct for this {@link Schema} + * @param schema the {@link Schema} for the Struct + */ + public Struct(Schema schema) { + if (schema.type() != Schema.Type.STRUCT) + throw new DataException("Not a struct schema: " + schema); + this.schema = schema; + this.values = new Object[schema.fields().size()]; + } + + /** + * Get the schema for this Struct. + * @return the Struct's schema + */ + public Schema schema() { + return schema; + } + + /** + * Get the value of a field, returning the default value if no value has been set yet and a default value is specified + * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and + * must be cast to a more specific type. + * @param fieldName the field name to lookup + * @return the value for the field + */ + public Object get(String fieldName) { + Field field = lookupField(fieldName); + return get(field); + } + + /** + * Get the value of a field, returning the default value if no value has been set yet and a default value is specified + * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and + * must be cast to a more specific type. + * @param field the field to lookup + * @return the value for the field + */ + public Object get(Field field) { + Object val = values[field.index()]; + if (val == null && schema.defaultValue() != null) { + val = schema.defaultValue(); + } + return val; + } + + /** + * Get the underlying raw value for the field without accounting for default values. + * @param fieldName the field to get the value of + * @return the raw value + */ + public Object getWithoutDefault(String fieldName) { + Field field = lookupField(fieldName); + return values[field.index()]; + } + + // Note that all getters have to have boxed return types since the fields might be optional + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a Byte. + */ + public Byte getInt8(String fieldName) { + return (Byte) getCheckType(fieldName, Schema.Type.INT8); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a Short. + */ + public Short getInt16(String fieldName) { + return (Short) getCheckType(fieldName, Schema.Type.INT16); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a Integer. + */ + public Integer getInt32(String fieldName) { + return (Integer) getCheckType(fieldName, Schema.Type.INT32); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a Long. + */ + public Long getInt64(String fieldName) { + return (Long) getCheckType(fieldName, Schema.Type.INT64); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a Float. + */ + public Float getFloat32(String fieldName) { + return (Float) getCheckType(fieldName, Schema.Type.FLOAT32); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a Double. + */ + public Double getFloat64(String fieldName) { + return (Double) getCheckType(fieldName, Schema.Type.FLOAT64); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a Boolean. + */ + public Boolean getBoolean(String fieldName) { + return (Boolean) getCheckType(fieldName, Schema.Type.BOOLEAN); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a String. + */ + public String getString(String fieldName) { + return (String) getCheckType(fieldName, Schema.Type.STRING); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a byte[]. + */ + public byte[] getBytes(String fieldName) { + Object bytes = getCheckType(fieldName, Schema.Type.BYTES); + if (bytes instanceof ByteBuffer) + return ((ByteBuffer) bytes).array(); + return (byte[]) bytes; + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a List. + */ + public <T> List<T> getArray(String fieldName) { + return (List<T>) getCheckType(fieldName, Schema.Type.ARRAY); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a Map. + */ + public <K, V> Map<K, V> getMap(String fieldName) { + return (Map<K, V>) getCheckType(fieldName, Schema.Type.MAP); + } + + /** + * Equivalent to calling {@link #get(String)} and casting the result to a Struct. + */ + public Struct getStruct(String fieldName) { + return (Struct) getCheckType(fieldName, Schema.Type.STRUCT); + } + + /** + * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's + * {@link Schema}. + * @param fieldName the name of the field to set + * @param value the value of the field + * @return the Struct, to allow chaining of {@link #put(String, Object)} calls + */ + public Struct put(String fieldName, Object value) { + Field field = lookupField(fieldName); + return put(field, value); + } + + /** + * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's + * {@link Schema}. + * @param field the field to set + * @param value the value of the field + * @return the Struct, to allow chaining of {@link #put(String, Object)} calls + */ + public Struct put(Field field, Object value) { + ConnectSchema.validateValue(field.schema(), value); + values[field.index()] = value; + return this; + } + + + /** + * Validates that this struct has filled in all the necessary data with valid values. For required fields + * without defaults, this validates that a value has been set and has matching types/schemas. If any validation + * fails, throws a DataException. + */ + public void validate() { + for (Field field : schema.fields()) { + Schema fieldSchema = field.schema(); + Object value = values[field.index()]; + if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null)) + continue; + ConnectSchema.validateValue(fieldSchema, value); + } + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Struct struct = (Struct) o; + return Objects.equals(schema, struct.schema) && + Arrays.equals(values, struct.values); + } + + @Override + public int hashCode() { + return Objects.hash(schema, Arrays.hashCode(values)); + } + + private Field lookupField(String fieldName) { + Field field = schema.field(fieldName); + if (field == null) + throw new DataException(fieldName + " is not a valid field name"); + return field; + } + + // Get the field's value, but also check that the field matches the specified type, throwing an exception if it doesn't. + // Used to implement the get*() methods that return typed data instead of Object + private Object getCheckType(String fieldName, Schema.Type type) { + Field field = lookupField(fieldName); + if (field.schema().type() != type) + throw new DataException("Field '" + fieldName + "' is not of type " + type); + return values[field.index()]; + } + +} + http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Time.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Time.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Time.java new file mode 100644 index 0000000..cecd891 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Time.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; + +import java.util.Calendar; +import java.util.TimeZone; + +/** + * <p> + * A time representing a specific point in a day, not tied to any specific date. The corresponding Java type is a + * java.util.Date where only hours, minutes, seconds, and milliseconds can be non-zero. This effectively makes it a + * point in time during the first day after the Unix epoch. The underlying representation is an integer + * representing the number of milliseconds after midnight. + * </p> + */ +public class Time { + public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Time"; + + private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; + + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + + /** + * Returns a SchemaBuilder for a Time. By returning a SchemaBuilder you can override additional schema settings such + * as required/optional, default value, and documentation. + * @return a SchemaBuilder + */ + public static SchemaBuilder builder() { + return SchemaBuilder.int32() + .name(LOGICAL_NAME) + .version(1); + } + + public static final Schema SCHEMA = builder().schema(); + + /** + * Convert a value from its logical format (Time) to it's encoded format. + * @param value the logical value + * @return the encoded value + */ + public static int fromLogical(Schema schema, java.util.Date value) { + if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) + throw new DataException("Requested conversion of Time object but the schema does not match."); + Calendar calendar = Calendar.getInstance(UTC); + calendar.setTime(value); + long unixMillis = calendar.getTimeInMillis(); + if (unixMillis < 0 || unixMillis > MILLIS_PER_DAY) { + throw new DataException("Kafka Connect Time type should not have any date fields set to non-zero values."); + } + return (int) unixMillis; + } + + public static java.util.Date toLogical(Schema schema, int value) { + if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) + throw new DataException("Requested conversion of Date object but the schema does not match."); + if (value < 0 || value > MILLIS_PER_DAY) + throw new DataException("Time values must use number of milliseconds greater than 0 and less than 86400000"); + return new java.util.Date(value); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java new file mode 100644 index 0000000..c447f6d --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Timestamp.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import org.apache.kafka.connect.errors.DataException; + +import java.util.TimeZone; + +/** + * <p> + * A timestamp representing an absolute time, without timezone information. The corresponding Java type is a + * java.util.Date. The underlying representation is a long representing the number of milliseconds since Unix epoch. + * </p> + */ +public class Timestamp { + public static final String LOGICAL_NAME = "org.apache.kafka.connect.data.Timestamp"; + + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + + /** + * Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder you can override additional schema settings such + * as required/optional, default value, and documentation. + * @return a SchemaBuilder + */ + public static SchemaBuilder builder() { + return SchemaBuilder.int64() + .name(LOGICAL_NAME) + .version(1); + } + + public static final Schema SCHEMA = builder().schema(); + + /** + * Convert a value from its logical format (Date) to it's encoded format. + * @param value the logical value + * @return the encoded value + */ + public static long fromLogical(Schema schema, java.util.Date value) { + if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) + throw new DataException("Requested conversion of Timestamp object but the schema does not match."); + return value.getTime(); + } + + public static java.util.Date toLogical(Schema schema, long value) { + if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) + throw new DataException("Requested conversion of Timestamp object but the schema does not match."); + return new java.util.Date(value); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/ConnectException.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/errors/ConnectException.java b/connect/api/src/main/java/org/apache/kafka/connect/errors/ConnectException.java new file mode 100644 index 0000000..1202be3 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/errors/ConnectException.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.errors; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * ConnectException is the top-level exception type generated by Kafka Connect and connector implementations. + */ +@InterfaceStability.Unstable +public class ConnectException extends KafkaException { + + public ConnectException(String s) { + super(s); + } + + public ConnectException(String s, Throwable throwable) { + super(s, throwable); + } + + public ConnectException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/DataException.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/errors/DataException.java b/connect/api/src/main/java/org/apache/kafka/connect/errors/DataException.java new file mode 100644 index 0000000..75426a3 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/errors/DataException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.errors; + +/** + * Base class for all Kafka Connect data API exceptions. + */ +public class DataException extends ConnectException { + public DataException(String s) { + super(s); + } + + public DataException(String s, Throwable throwable) { + super(s, throwable); + } + + public DataException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/IllegalWorkerStateException.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/errors/IllegalWorkerStateException.java b/connect/api/src/main/java/org/apache/kafka/connect/errors/IllegalWorkerStateException.java new file mode 100644 index 0000000..8310059 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/errors/IllegalWorkerStateException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.errors; + +/** + * Indicates that a method has been invoked illegally or at an invalid time by a connector or task. + */ +public class IllegalWorkerStateException extends ConnectException { + public IllegalWorkerStateException(String s) { + super(s); + } + + public IllegalWorkerStateException(String s, Throwable throwable) { + super(s, throwable); + } + + public IllegalWorkerStateException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaBuilderException.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaBuilderException.java b/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaBuilderException.java new file mode 100644 index 0000000..6f0e551 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaBuilderException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.errors; + +public class SchemaBuilderException extends DataException { + public SchemaBuilderException(String s) { + super(s); + } + + public SchemaBuilderException(String s, Throwable throwable) { + super(s, throwable); + } + + public SchemaBuilderException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaProjectorException.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaProjectorException.java b/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaProjectorException.java new file mode 100644 index 0000000..3e2a763 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/errors/SchemaProjectorException.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + **/ +package org.apache.kafka.connect.errors; + +public class SchemaProjectorException extends DataException { + public SchemaProjectorException(String s) { + super(s); + } + + public SchemaProjectorException(String s, Throwable throwable) { + super(s, throwable); + } + + public SchemaProjectorException(Throwable throwable) { + super(throwable); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java new file mode 100644 index 0000000..fbe6975 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkConnector.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.sink; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.connector.Connector; + +/** + * SinkConnectors implement the Connector interface to send Kafka data to another system. + */ +@InterfaceStability.Unstable +public abstract class SinkConnector extends Connector { + + /** + * <p> + * Configuration key for the list of input topics for this connector. + * </p> + * <p> + * Usually this setting is only relevant to the Kafka Connect framework, but is provided here for + * the convenience of Connector developers if they also need to know the set of topics. + * </p> + */ + public static final String TOPICS_CONFIG = "topics"; + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java new file mode 100644 index 0000000..0bd0f6f --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.sink; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; + +/** + * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and includes the kafkaOffset of + * the record in the Kafka topic-partition in addition to the standard fields. This information + * should be used by the SinkTask to coordinate kafkaOffset commits. + */ +@InterfaceStability.Unstable +public class SinkRecord extends ConnectRecord { + private final long kafkaOffset; + + public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset) { + super(topic, partition, keySchema, key, valueSchema, value); + this.kafkaOffset = kafkaOffset; + } + + public long kafkaOffset() { + return kafkaOffset; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + if (!super.equals(o)) + return false; + + SinkRecord that = (SinkRecord) o; + + if (kafkaOffset != that.kafkaOffset) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = super.hashCode(); + result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32)); + return result; + } + + @Override + public String toString() { + return "SinkRecord{" + + "kafkaOffset=" + kafkaOffset + + "} " + super.toString(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java new file mode 100644 index 0000000..7e793c8 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTask.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.sink; + +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.connector.Task; + +import java.util.Collection; +import java.util.Map; + +/** + * SinkTask is a Task takes records loaded from Kafka and sends them to another system. In + * addition to the basic {@link #put} interface, SinkTasks must also implement {@link #flush} + * to support offset commits. + */ +@InterfaceStability.Unstable +public abstract class SinkTask implements Task { + + /** + * <p> + * The configuration key that provides the list of topics that are inputs for this + * SinkTask. + * </p> + */ + public static final String TOPICS_CONFIG = "topics"; + + protected SinkTaskContext context; + + public void initialize(SinkTaskContext context) { + this.context = context; + } + + /** + * Start the Task. This should handle any configuration parsing and one-time setup of the task. + * @param props initial configuration + */ + @Override + public abstract void start(Map<String, String> props); + + /** + * Put the records in the sink. Usually this should send the records to the sink asynchronously + * and immediately return. + * + * If this operation fails, the SinkTask may throw a {@link org.apache.kafka.connect.errors.RetriableException} to + * indicate that the framework should attempt to retry the same call again. Other exceptions will cause the task to + * be stopped immediately. {@link SinkTaskContext#timeout(long)} can be used to set the maximum time before the + * batch will be retried. + * + * @param records the set of records to send + */ + public abstract void put(Collection<SinkRecord> records); + + /** + * Flush all records that have been {@link #put} for the specified topic-partitions. The + * offsets are provided for convenience, but could also be determined by tracking all offsets + * included in the SinkRecords passed to {@link #put}. + * + * @param offsets mapping of TopicPartition to committed offset + */ + public abstract void flush(Map<TopicPartition, OffsetAndMetadata> offsets); + + /** + * The SinkTask use this method to create writers for newly assigned partitions in case of partition + * re-assignment. In partition re-assignment, some new partitions may be assigned to the SinkTask. + * The SinkTask needs to create writers and perform necessary recovery for the newly assigned partitions. + * This method will be called after partition re-assignment completes and before the SinkTask starts + * fetching data. + * @param partitions The list of partitions that are now assigned to the task (may include + * partitions previously assigned to the task) + */ + public void onPartitionsAssigned(Collection<TopicPartition> partitions) { + } + + /** + * The SinkTask use this method to close writers and commit offsets for partitions that are + * longer assigned to the SinkTask. This method will be called before a rebalance operation starts + * and after the SinkTask stops fetching data. + * @param partitions The list of partitions that were assigned to the consumer on the last + * rebalance + */ + public void onPartitionsRevoked(Collection<TopicPartition> partitions) { + } + + /** + * Perform any cleanup to stop this task. In SinkTasks, this method is invoked only once outstanding calls to other + * methods have completed (e.g., {@link #put(Collection)} has returned) and a final {@link #flush(Map)} and offset + * commit has completed. Implementations of this method should only need to perform final cleanup operations, such + * as closing network connections to the sink system. + */ + public abstract void stop(); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java new file mode 100644 index 0000000..2202cae --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkTaskContext.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.sink; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Map; +import java.util.Set; + +/** + * Context passed to SinkTasks, allowing them to access utilities in the Kafka Connect runtime. + */ +@InterfaceStability.Unstable +public interface SinkTaskContext { + /** + * Reset the consumer offsets for the given topic partitions. SinkTasks should use this if they manage offsets + * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record + * offsets in HDFS to provide exactly once delivery. When the SinkTask is started or a rebalance occurs, the task + * would reload offsets from HDFS and use this method to reset the consumer to those offsets. + * + * SinkTasks that do not manage their own offsets do not need to use this method. + * + * @param offsets map of offsets for topic partitions + */ + void offset(Map<TopicPartition, Long> offsets); + + /** + * Reset the consumer offsets for the given topic partition. SinkTasks should use if they manage offsets + * in the sink data store rather than using Kafka consumer offsets. For example, an HDFS connector might record + * offsets in HDFS to provide exactly once delivery. When the topic partition is recovered the task + * would reload offsets from HDFS and use this method to reset the consumer to the offset. + * + * SinkTasks that do not manage their own offsets do not need to use this method. + * + * @param tp the topic partition to reset offset. + * @param offset the offset to reset to. + */ + void offset(TopicPartition tp, long offset); + + /** + * Set the timeout in milliseconds. SinkTasks should use this to indicate that they need to retry certain + * operations after the timeout. SinkTasks may have certain operations on external systems that may need + * to retry in case of failures. For example, append a record to an HDFS file may fail due to temporary network + * issues. SinkTasks use this method to set how long to wait before retrying. + * @param timeoutMs the backoff timeout in milliseconds. + */ + void timeout(long timeoutMs); + + /** + * Get the current set of assigned TopicPartitions for this task. + * @return the set of currently assigned TopicPartitions + */ + Set<TopicPartition> assignment(); + + /** + * Pause consumption of messages from the specified TopicPartitions. + * @param partitions the partitions which should be paused + */ + void pause(TopicPartition... partitions); + + /** + * Resume consumption of messages from previously paused TopicPartitions. + * @param partitions the partitions to resume + */ + void resume(TopicPartition... partitions); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java new file mode 100644 index 0000000..2ba5139 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceConnector.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.connect.source; + +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.connect.connector.Connector; + +/** + * SourceConnectors implement the connector interface to pull data from another system and send + * it to Kafka. + */ +@InterfaceStability.Unstable +public abstract class SourceConnector extends Connector { + +}