http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java deleted file mode 100644 index 104abf1..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java +++ /dev/null @@ -1,323 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; - -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.util.*; - -public class CopycatSchema implements Schema { - /** - * Maps Schema.Types to a list of Java classes that can be used to represent them. - */ - private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new HashMap<>(); - /** - * Maps known logical types to a list of Java classes that can be used to represent them. - */ - private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new HashMap<>(); - - /** - * Maps the Java classes to the corresponding Schema.Type. - */ - private static final Map<Class<?>, Type> JAVA_CLASS_SCHEMA_TYPES = new HashMap<>(); - - static { - SCHEMA_TYPE_CLASSES.put(Type.INT8, Arrays.asList((Class) Byte.class)); - SCHEMA_TYPE_CLASSES.put(Type.INT16, Arrays.asList((Class) Short.class)); - SCHEMA_TYPE_CLASSES.put(Type.INT32, Arrays.asList((Class) Integer.class)); - SCHEMA_TYPE_CLASSES.put(Type.INT64, Arrays.asList((Class) Long.class)); - SCHEMA_TYPE_CLASSES.put(Type.FLOAT32, Arrays.asList((Class) Float.class)); - SCHEMA_TYPE_CLASSES.put(Type.FLOAT64, Arrays.asList((Class) Double.class)); - SCHEMA_TYPE_CLASSES.put(Type.BOOLEAN, Arrays.asList((Class) Boolean.class)); - SCHEMA_TYPE_CLASSES.put(Type.STRING, Arrays.asList((Class) String.class)); - // Bytes are special and have 2 representations. byte[] causes problems because it doesn't handle equals() and - // hashCode() like we want objects to, so we support both byte[] and ByteBuffer. Using plain byte[] can cause - // those methods to fail, so ByteBuffers are recommended - SCHEMA_TYPE_CLASSES.put(Type.BYTES, Arrays.asList((Class) byte[].class, (Class) ByteBuffer.class)); - SCHEMA_TYPE_CLASSES.put(Type.ARRAY, Arrays.asList((Class) List.class)); - SCHEMA_TYPE_CLASSES.put(Type.MAP, Arrays.asList((Class) Map.class)); - SCHEMA_TYPE_CLASSES.put(Type.STRUCT, Arrays.asList((Class) Struct.class)); - - for (Map.Entry<Type, List<Class>> schemaClasses : SCHEMA_TYPE_CLASSES.entrySet()) { - for (Class<?> schemaClass : schemaClasses.getValue()) - JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey()); - } - - LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Arrays.asList((Class) BigDecimal.class)); - LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class)); - LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class)); - LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class)); - // We don't need to put these into JAVA_CLASS_SCHEMA_TYPES since that's only used to determine schemas for - // schemaless data and logical types will have ambiguous schemas (e.g. many of them use the same Java class) so - // they should not be used without schemas. - } - - // The type of the field - private final Type type; - private final boolean optional; - private final Object defaultValue; - - private final List<Field> fields; - private final Map<String, Field> fieldsByName; - - private final Schema keySchema; - private final Schema valueSchema; - - // Optional name and version provide a built-in way to indicate what type of data is included. Most - // useful for structs to indicate the semantics of the struct and map it to some existing underlying - // serializer-specific schema. However, can also be useful in specifying other logical types (e.g. a set is an array - // with additional constraints). - private final String name; - private final Integer version; - // Optional human readable documentation describing this schema. - private final String doc; - private final Map<String, String> parameters; - - /** - * Construct a Schema. Most users should not construct schemas manually, preferring {@link SchemaBuilder} instead. - */ - public CopycatSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc, Map<String, String> parameters, List<Field> fields, Schema keySchema, Schema valueSchema) { - this.type = type; - this.optional = optional; - this.defaultValue = defaultValue; - this.name = name; - this.version = version; - this.doc = doc; - this.parameters = parameters; - - this.fields = fields; - if (this.fields != null && this.type == Type.STRUCT) { - this.fieldsByName = new HashMap<>(); - for (Field field : fields) - fieldsByName.put(field.name(), field); - } else { - this.fieldsByName = null; - } - - this.keySchema = keySchema; - this.valueSchema = valueSchema; - } - - /** - * Construct a Schema for a primitive type, setting schema parameters, struct fields, and key and value schemas to null. - */ - public CopycatSchema(Type type, boolean optional, Object defaultValue, String name, Integer version, String doc) { - this(type, optional, defaultValue, name, version, doc, null, null, null, null); - } - - /** - * Construct a default schema for a primitive type. The schema is required, has no default value, name, version, - * or documentation. - */ - public CopycatSchema(Type type) { - this(type, false, null, null, null, null); - } - - @Override - public Type type() { - return type; - } - - @Override - public boolean isOptional() { - return optional; - } - - @Override - public Object defaultValue() { - return defaultValue; - } - - @Override - public String name() { - return name; - } - - @Override - public Integer version() { - return version; - } - - @Override - public String doc() { - return doc; - } - - @Override - public Map<String, String> parameters() { - return parameters; - } - - @Override - public List<Field> fields() { - if (type != Type.STRUCT) - throw new DataException("Cannot list fields on non-struct type"); - return fields; - } - - public Field field(String fieldName) { - if (type != Type.STRUCT) - throw new DataException("Cannot look up fields on non-struct type"); - return fieldsByName.get(fieldName); - } - - @Override - public Schema keySchema() { - if (type != Type.MAP) - throw new DataException("Cannot look up key schema on non-map type"); - return keySchema; - } - - @Override - public Schema valueSchema() { - if (type != Type.MAP && type != Type.ARRAY) - throw new DataException("Cannot look up value schema on non-array and non-map type"); - return valueSchema; - } - - - - /** - * Validate that the value can be used with the schema, i.e. that its type matches the schema type and nullability - * requirements. Throws a DataException if the value is invalid. - * @param schema Schema to test - * @param value value to test - */ - public static void validateValue(Schema schema, Object value) { - if (value == null) { - if (!schema.isOptional()) - throw new DataException("Invalid value: null used for required field"); - else - return; - } - - List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name()); - - if (expectedClasses == null) - expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type()); - - if (expectedClasses == null) - throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); - - boolean foundMatch = false; - for (Class<?> expectedClass : expectedClasses) { - if (expectedClass.isInstance(value)) { - foundMatch = true; - break; - } - } - if (!foundMatch) - throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); - - switch (schema.type()) { - case STRUCT: - Struct struct = (Struct) value; - if (!struct.schema().equals(schema)) - throw new DataException("Struct schemas do not match."); - struct.validate(); - break; - case ARRAY: - List<?> array = (List<?>) value; - for (Object entry : array) - validateValue(schema.valueSchema(), entry); - break; - case MAP: - Map<?, ?> map = (Map<?, ?>) value; - for (Map.Entry<?, ?> entry : map.entrySet()) { - validateValue(schema.keySchema(), entry.getKey()); - validateValue(schema.valueSchema(), entry.getValue()); - } - break; - } - } - - /** - * Validate that the value can be used for this schema, i.e. that its type matches the schema type and optional - * requirements. Throws a DataException if the value is invalid. - * @param value the value to validate - */ - public void validateValue(Object value) { - validateValue(this, value); - } - - @Override - public CopycatSchema schema() { - return this; - } - - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - CopycatSchema schema = (CopycatSchema) o; - return Objects.equals(optional, schema.optional) && - Objects.equals(type, schema.type) && - Objects.equals(defaultValue, schema.defaultValue) && - Objects.equals(fields, schema.fields) && - Objects.equals(keySchema, schema.keySchema) && - Objects.equals(valueSchema, schema.valueSchema) && - Objects.equals(name, schema.name) && - Objects.equals(version, schema.version) && - Objects.equals(doc, schema.doc) && - Objects.equals(parameters, schema.parameters); - } - - @Override - public int hashCode() { - return Objects.hash(type, optional, defaultValue, fields, keySchema, valueSchema, name, version, doc, parameters); - } - - @Override - public String toString() { - if (name != null) - return "Schema{" + name + ":" + type + "}"; - else - return "Schema{" + type + "}"; - } - - - /** - * Get the {@link Type} associated with the the given class. - * - * @param klass the Class to - * @return the corresponding type, nor null if there is no matching type - */ - public static Type schemaType(Class<?> klass) { - synchronized (JAVA_CLASS_SCHEMA_TYPES) { - Type schemaType = JAVA_CLASS_SCHEMA_TYPES.get(klass); - if (schemaType != null) - return schemaType; - - // Since the lookup only checks the class, we need to also try - for (Map.Entry<Class<?>, Type> entry : JAVA_CLASS_SCHEMA_TYPES.entrySet()) { - try { - klass.asSubclass(entry.getKey()); - // Cache this for subsequent lookups - JAVA_CLASS_SCHEMA_TYPES.put(klass, entry.getValue()); - return entry.getValue(); - } catch (ClassCastException e) { - // Expected, ignore - } - } - } - return null; - } -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java deleted file mode 100644 index 4e14659..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Date.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; - -import java.util.Calendar; -import java.util.TimeZone; - -/** - * <p> - * A date representing a calendar day with no time of day or timezone. The corresponding Java type is a java.util.Date - * with hours, minutes, seconds, milliseconds set to 0. The underlying representation is an integer representing the - * number of standardized days (based on a number of milliseconds with 24 hours/day, 60 minutes/hour, 60 seconds/minute, - * 1000 milliseconds/second with n) since Unix epoch. - * </p> - */ -public class Date { - public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Date"; - - private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; - - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - - /** - * Returns a SchemaBuilder for a Date. By returning a SchemaBuilder you can override additional schema settings such - * as required/optional, default value, and documentation. - * @return a SchemaBuilder - */ - public static SchemaBuilder builder() { - return SchemaBuilder.int32() - .name(LOGICAL_NAME) - .version(1); - } - - public static final Schema SCHEMA = builder().schema(); - - /** - * Convert a value from its logical format (Date) to it's encoded format. - * @param value the logical value - * @return the encoded value - */ - public static int fromLogical(Schema schema, java.util.Date value) { - if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) - throw new DataException("Requested conversion of Date object but the schema does not match."); - Calendar calendar = Calendar.getInstance(UTC); - calendar.setTime(value); - if (calendar.get(Calendar.HOUR_OF_DAY) != 0 || calendar.get(Calendar.MINUTE) != 0 || - calendar.get(Calendar.SECOND) != 0 || calendar.get(Calendar.MILLISECOND) != 0) { - throw new DataException("Copycat Date type should not have any time fields set to non-zero values."); - } - long unixMillis = calendar.getTimeInMillis(); - return (int) (unixMillis / MILLIS_PER_DAY); - } - - public static java.util.Date toLogical(Schema schema, int value) { - if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) - throw new DataException("Requested conversion of Date object but the schema does not match."); - return new java.util.Date(value * MILLIS_PER_DAY); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java deleted file mode 100644 index f23e13e..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Decimal.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; - -import java.math.BigDecimal; -import java.math.BigInteger; - -/** - * <p> - * An arbitrary-precision signed decimal number. The value is unscaled * 10 ^ -scale where: - * <ul> - * <li>unscaled is an integer </li> - * <li>scale is an integer representing how many digits the decimal point should be shifted on the unscaled value</li> - * </ul> - * </p> - * <p> - * Decimal does not provide a fixed schema because it is parameterized by the scale, which is fixed on the schema - * rather than being part of the value. - * </p> - * <p> - * The underlying representation of this type is bytes containing a two's complement integer - * </p> - */ -public class Decimal { - public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Decimal"; - public static final String SCALE_FIELD = "scale"; - - /** - * Returns a SchemaBuilder for a Decimal with the given scale factor. By returning a SchemaBuilder you can override - * additional schema settings such as required/optional, default value, and documentation. - * @param scale the scale factor to apply to unscaled values - * @return a SchemaBuilder - */ - public static SchemaBuilder builder(int scale) { - return SchemaBuilder.bytes() - .name(LOGICAL_NAME) - .parameter(SCALE_FIELD, ((Integer) scale).toString()) - .version(1); - } - - public static Schema schema(int scale) { - return builder(scale).build(); - } - - /** - * Convert a value from its logical format (BigDecimal) to it's encoded format. - * @param value the logical value - * @return the encoded value - */ - public static byte[] fromLogical(Schema schema, BigDecimal value) { - if (value.scale() != scale(schema)) - throw new DataException("BigDecimal has mismatching scale value for given Decimal schema"); - return value.unscaledValue().toByteArray(); - } - - public static BigDecimal toLogical(Schema schema, byte[] value) { - return new BigDecimal(new BigInteger(value), scale(schema)); - } - - private static int scale(Schema schema) { - String scaleString = schema.parameters().get(SCALE_FIELD); - if (scaleString == null) - throw new DataException("Invalid Decimal schema: scale parameter not found."); - try { - return Integer.parseInt(scaleString); - } catch (NumberFormatException e) { - throw new DataException("Invalid scale parameter found in Decimal schema: ", e); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java deleted file mode 100644 index c71cdb4..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Field.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import java.util.Objects; - -/** - * <p> - * A field in a {@link Struct}, consisting of a field name, index, and {@link Schema} for the field value. - * </p> - */ -public class Field { - private final String name; - private final int index; - private final Schema schema; - - public Field(String name, int index, Schema schema) { - this.name = name; - this.index = index; - this.schema = schema; - } - - /** - * Get the name of this field. - * @return the name of this field - */ - public String name() { - return name; - } - - - /** - * Get the index of this field within the struct. - * @return the index of this field - */ - public int index() { - return index; - } - - /** - * Get the schema of this field - * @return the schema of values of this field - */ - public Schema schema() { - return schema; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Field field = (Field) o; - return Objects.equals(index, field.index) && - Objects.equals(name, field.name) && - Objects.equals(schema, field.schema); - } - - @Override - public int hashCode() { - return Objects.hash(name, index, schema); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java deleted file mode 100644 index 3db01ae..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Schema.java +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import java.util.List; -import java.util.Map; - -/** - * <p> - * Definition of an abstract data type. Data types can be primitive types (integer types, floating point types, - * boolean, strings, and bytes) or complex types (typed arrays, maps with one key schema and value schema, - * and structs that have a fixed set of field names each with an associated value schema). Any type can be specified - * as optional, allowing it to be omitted (resulting in null values when it is missing) and can specify a default - * value. - * </p> - * <p> - * All schemas may have some associated metadata: a name, version, and documentation. These are all considered part - * of the schema itself and included when comparing schemas. Besides adding important metadata, these fields enable - * the specification of logical types that specify additional constraints and semantics (e.g. UNIX timestamps are - * just an int64, but the user needs the know about the additional semantics to interpret it properly). - * </p> - * <p> - * Schemas can be created directly, but in most cases using {@link SchemaBuilder} will be simpler. - * </p> - */ -public interface Schema { - /** - * The type of a schema. These only include the core types; logical types must be determined by checking the schema name. - */ - enum Type { - INT8, INT16, INT32, INT64, FLOAT32, FLOAT64, BOOLEAN, STRING, BYTES, ARRAY, MAP, STRUCT; - - private String name; - - Type() { - this.name = this.name().toLowerCase(); - } - - public String getName() { - return name; - } - - public boolean isPrimitive() { - switch (this) { - case INT8: - case INT16: - case INT32: - case INT64: - case FLOAT32: - case FLOAT64: - case BOOLEAN: - case STRING: - case BYTES: - return true; - } - return false; - } - } - - - Schema INT8_SCHEMA = SchemaBuilder.int8().build(); - Schema INT16_SCHEMA = SchemaBuilder.int16().build(); - Schema INT32_SCHEMA = SchemaBuilder.int32().build(); - Schema INT64_SCHEMA = SchemaBuilder.int64().build(); - Schema FLOAT32_SCHEMA = SchemaBuilder.float32().build(); - Schema FLOAT64_SCHEMA = SchemaBuilder.float64().build(); - Schema BOOLEAN_SCHEMA = SchemaBuilder.bool().build(); - Schema STRING_SCHEMA = SchemaBuilder.string().build(); - Schema BYTES_SCHEMA = SchemaBuilder.bytes().build(); - - Schema OPTIONAL_INT8_SCHEMA = SchemaBuilder.int8().optional().build(); - Schema OPTIONAL_INT16_SCHEMA = SchemaBuilder.int16().optional().build(); - Schema OPTIONAL_INT32_SCHEMA = SchemaBuilder.int32().optional().build(); - Schema OPTIONAL_INT64_SCHEMA = SchemaBuilder.int64().optional().build(); - Schema OPTIONAL_FLOAT32_SCHEMA = SchemaBuilder.float32().optional().build(); - Schema OPTIONAL_FLOAT64_SCHEMA = SchemaBuilder.float64().optional().build(); - Schema OPTIONAL_BOOLEAN_SCHEMA = SchemaBuilder.bool().optional().build(); - Schema OPTIONAL_STRING_SCHEMA = SchemaBuilder.string().optional().build(); - Schema OPTIONAL_BYTES_SCHEMA = SchemaBuilder.bytes().optional().build(); - - /** - * @return the type of this schema - */ - Type type(); - - /** - * @return true if this field is optional, false otherwise - */ - boolean isOptional(); - - /** - * @return the default value for this schema - */ - Object defaultValue(); - - /** - * @return the name of this schema - */ - String name(); - - /** - * Get the optional version of the schema. If a version is included, newer versions *must* be larger than older ones. - * @return the version of this schema - */ - Integer version(); - - /** - * @return the documentation for this schema - */ - String doc(); - - /** - * Get a map of schema parameters. - * @return Map containing parameters for this schema, or null if there are no parameters - */ - Map<String, String> parameters(); - - /** - * Get the key schema for this map schema. Throws a DataException if this schema is not a map. - * @return the key schema - */ - Schema keySchema(); - - /** - * Get the value schema for this map or array schema. Throws a DataException if this schema is not a map or array. - * @return the value schema - */ - Schema valueSchema(); - - /** - * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct. - * @return the list of fields for this Schema - */ - List<Field> fields(); - - /** - * Get a field for this Schema by name. Throws a DataException if this schema is not a struct. - * @param fieldName the name of the field to look up - * @return the Field object for the specified field, or null if there is no field with the given name - */ - Field field(String fieldName); - - /** - * Return a concrete instance of the {@link Schema} - * @return the {@link Schema} - */ - Schema schema(); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java deleted file mode 100644 index 368a8cf..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaAndValue.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import java.util.Objects; - -public class SchemaAndValue { - private final Schema schema; - private final Object value; - - public static final SchemaAndValue NULL = new SchemaAndValue(null, null); - - public SchemaAndValue(Schema schema, Object value) { - this.value = value; - this.schema = schema; - } - - public Schema schema() { - return schema; - } - - public Object value() { - return value; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - SchemaAndValue that = (SchemaAndValue) o; - return Objects.equals(schema, that.schema) && - Objects.equals(value, that.value); - } - - @Override - public int hashCode() { - return Objects.hash(schema, value); - } - - @Override - public String toString() { - return "SchemaAndValue{" + - "schema=" + schema + - ", value=" + value + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java deleted file mode 100644 index 21ae54c..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaBuilder.java +++ /dev/null @@ -1,412 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; -import org.apache.kafka.copycat.errors.SchemaBuilderException; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; - -/** - * <p> - * SchemaBuilder provides a fluent API for constructing {@link Schema} objects. It allows you to set each of the - * properties for the schema and each call returns the SchemaBuilder so the calls can be chained. When nested types - * are required, use one of the predefined schemas from {@link Schema} or use a second SchemaBuilder inline. - * </p> - * <p> - * Here is an example of building a struct schema: - * <pre> - * Schema dateSchema = SchemaBuilder.struct() - * .name("com.example.CalendarDate").version(2).doc("A calendar date including month, day, and year.") - * .field("month", Schema.STRING_SCHEMA) - * .field("day", Schema.INT8_SCHEMA) - * .field("year", Schema.INT16_SCHEMA) - * .build(); - * </pre> - * </p> - * <p> - * Here is an example of using a second SchemaBuilder to construct complex, nested types: - * <pre> - * Schema userListSchema = SchemaBuilder.array( - * SchemaBuilder.struct().name("com.example.User").field("username", Schema.STRING_SCHEMA).field("id", Schema.INT64_SCHEMA).build() - * ).build(); - * </pre> - * </p> - */ -public class SchemaBuilder implements Schema { - private static final String TYPE_FIELD = "type"; - private static final String OPTIONAL_FIELD = "optional"; - private static final String DEFAULT_FIELD = "default"; - private static final String NAME_FIELD = "name"; - private static final String VERSION_FIELD = "version"; - private static final String DOC_FIELD = "doc"; - - - private final Type type; - private Boolean optional = null; - private Object defaultValue = null; - - private List<Field> fields = null; - private Schema keySchema = null; - private Schema valueSchema = null; - - private String name; - private Integer version; - // Optional human readable documentation describing this schema. - private String doc; - // Additional parameters for logical types. - private Map<String, String> parameters; - - private SchemaBuilder(Type type) { - this.type = type; - } - - // Common/metadata fields - - @Override - public boolean isOptional() { - return optional == null ? false : optional; - } - - /** - * Set this schema as optional. - * @return the SchemaBuilder - */ - public SchemaBuilder optional() { - checkNull(OPTIONAL_FIELD, optional); - optional = true; - return this; - } - - /** - * Set this schema as required. This is the default, but this method can be used to make this choice explicit. - * @return the SchemaBuilder - */ - public SchemaBuilder required() { - checkNull(OPTIONAL_FIELD, optional); - optional = false; - return this; - } - - @Override - public Object defaultValue() { - return defaultValue; - } - - /** - * Set the default value for this schema. The value is validated against the schema type, throwing a - * {@link SchemaBuilderException} if it does not match. - * @param value the default value - * @return the SchemaBuilder - */ - public SchemaBuilder defaultValue(Object value) { - checkNull(DEFAULT_FIELD, defaultValue); - checkNotNull(TYPE_FIELD, type, DEFAULT_FIELD); - try { - CopycatSchema.validateValue(this, value); - } catch (DataException e) { - throw new SchemaBuilderException("Invalid default value", e); - } - defaultValue = value; - return this; - } - - @Override - public String name() { - return name; - } - - /** - * Set the name of this schema. - * @param name the schema name - * @return the SchemaBuilder - */ - public SchemaBuilder name(String name) { - checkNull(NAME_FIELD, this.name); - this.name = name; - return this; - } - - @Override - public Integer version() { - return version; - } - - /** - * Set the version of this schema. Schema versions are integers which, if provided, must indicate which schema is - * newer and which is older by their ordering. - * @param version the schema version - * @return the SchemaBuilder - */ - public SchemaBuilder version(Integer version) { - checkNull(VERSION_FIELD, this.version); - this.version = version; - return this; - } - - @Override - public String doc() { - return doc; - } - - /** - * Set the documentation for this schema. - * @param doc the documentation - * @return the SchemaBuilder - */ - public SchemaBuilder doc(String doc) { - checkNull(DOC_FIELD, this.doc); - this.doc = doc; - return this; - } - - @Override - public Map<String, String> parameters() { - return Collections.unmodifiableMap(parameters); - } - - /** - * Set a schema parameter. - * @param propertyName name of the schema property to define - * @param propertyValue value of the schema property to define, as a String - * @return the SchemaBuilder - */ - public SchemaBuilder parameter(String propertyName, String propertyValue) { - // Preserve order of insertion with a LinkedHashMap. This isn't strictly necessary, but is nice if logical types - // can print their properties in a consistent order. - if (parameters == null) - parameters = new LinkedHashMap<>(); - parameters.put(propertyName, propertyValue); - return this; - } - - /** - * Set schema parameters. This operation is additive; it does not remove existing parameters that do not appear in - * the set of properties pass to this method. - * @param props Map of properties to set - * @return the SchemaBuilder - */ - public SchemaBuilder parameters(Map<String, String> props) { - // Avoid creating an empty set of properties so we never have an empty map - if (props.isEmpty()) - return this; - if (parameters == null) - parameters = new LinkedHashMap<>(); - parameters.putAll(props); - return this; - } - - @Override - public Type type() { - return type; - } - - /** - * Create a SchemaBuilder for the specified type. - * - * Usually it will be simpler to use one of the variants like {@link #string()} or {@link #struct()}, but this form - * can be useful when generating schemas dynamically. - * - * @param type the schema type - * @return a new SchemaBuilder - */ - public static SchemaBuilder type(Type type) { - return new SchemaBuilder(type); - } - - // Primitive types - - /** - * @return a new {@link Type#INT8} SchemaBuilder - */ - public static SchemaBuilder int8() { - return new SchemaBuilder(Type.INT8); - } - - /** - * @return a new {@link Type#INT16} SchemaBuilder - */ - public static SchemaBuilder int16() { - return new SchemaBuilder(Type.INT16); - } - - /** - * @return a new {@link Type#INT32} SchemaBuilder - */ - public static SchemaBuilder int32() { - return new SchemaBuilder(Type.INT32); - } - - /** - * @return a new {@link Type#INT64} SchemaBuilder - */ - public static SchemaBuilder int64() { - return new SchemaBuilder(Type.INT64); - } - - /** - * @return a new {@link Type#FLOAT32} SchemaBuilder - */ - public static SchemaBuilder float32() { - return new SchemaBuilder(Type.FLOAT32); - } - - /** - * @return a new {@link Type#FLOAT64} SchemaBuilder - */ - public static SchemaBuilder float64() { - return new SchemaBuilder(Type.FLOAT64); - } - - /** - * @return a new {@link Type#BOOLEAN} SchemaBuilder - */ - public static SchemaBuilder bool() { - return new SchemaBuilder(Type.BOOLEAN); - } - - /** - * @return a new {@link Type#STRING} SchemaBuilder - */ - public static SchemaBuilder string() { - return new SchemaBuilder(Type.STRING); - } - - /** - * @return a new {@link Type#BYTES} SchemaBuilder - */ - public static SchemaBuilder bytes() { - return new SchemaBuilder(Type.BYTES); - } - - - // Structs - - /** - * @return a new {@link Type#STRUCT} SchemaBuilder - */ - public static SchemaBuilder struct() { - return new SchemaBuilder(Type.STRUCT); - } - - /** - * Add a field to this struct schema. Throws a SchemaBuilderException if this is not a struct schema. - * @param fieldName the name of the field to add - * @param fieldSchema the Schema for the field's value - * @return the SchemaBuilder - */ - public SchemaBuilder field(String fieldName, Schema fieldSchema) { - if (type != Type.STRUCT) - throw new SchemaBuilderException("Cannot create fields on type " + type); - if (fields == null) - fields = new ArrayList<>(); - int fieldIndex = fields.size(); - fields.add(new Field(fieldName, fieldIndex, fieldSchema)); - return this; - } - - /** - * Get the list of fields for this Schema. Throws a DataException if this schema is not a struct. - * @return the list of fields for this Schema - */ - public List<Field> fields() { - if (type != Type.STRUCT) - throw new DataException("Cannot list fields on non-struct type"); - return fields; - } - - public Field field(String fieldName) { - if (type != Type.STRUCT) - throw new DataException("Cannot look up fields on non-struct type"); - for (Field field : fields) - if (field.name() == fieldName) - return field; - return null; - } - - - - // Maps & Arrays - - /** - * @param valueSchema the schema for elements of the array - * @return a new {@link Type#ARRAY} SchemaBuilder - */ - public static SchemaBuilder array(Schema valueSchema) { - SchemaBuilder builder = new SchemaBuilder(Type.ARRAY); - builder.valueSchema = valueSchema; - return builder; - } - - /** - * @param keySchema the schema for keys in the map - * @param valueSchema the schema for values in the map - * @return a new {@link Type#MAP} SchemaBuilder - */ - public static SchemaBuilder map(Schema keySchema, Schema valueSchema) { - SchemaBuilder builder = new SchemaBuilder(Type.MAP); - builder.keySchema = keySchema; - builder.valueSchema = valueSchema; - return builder; - } - - @Override - public Schema keySchema() { - return keySchema; - } - - @Override - public Schema valueSchema() { - return valueSchema; - } - - - /** - * Build the Schema using the current settings - * @return the {@link Schema} - */ - public Schema build() { - return new CopycatSchema(type, isOptional(), defaultValue, name, version, doc, - parameters == null ? null : Collections.unmodifiableMap(parameters), - fields == null ? null : Collections.unmodifiableList(fields), keySchema, valueSchema); - } - - /** - * Return a concrete instance of the {@link Schema} specified by this builder - * @return the {@link Schema} - */ - @Override - public Schema schema() { - return build(); - } - - - private static void checkNull(String fieldName, Object val) { - if (val != null) - throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " has already been set."); - } - - private static void checkNotNull(String fieldName, Object val, String fieldToSet) { - if (val == null) - throw new SchemaBuilderException("Invalid SchemaBuilder call: " + fieldName + " must be specified to set " + fieldToSet); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java deleted file mode 100644 index 3ab9e7f..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/SchemaProjector.java +++ /dev/null @@ -1,197 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.data.Schema.Type; -import org.apache.kafka.copycat.errors.SchemaProjectorException; - -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -/** - * <p> - * SchemaProjector is utility to project a value between compatible schemas and throw exceptions - * when non compatible schemas are provided. - * </p> - */ - -public class SchemaProjector { - - private static Set<AbstractMap.SimpleImmutableEntry<Type, Type>> promotable = new HashSet<>(); - - static { - Type[] promotableTypes = {Type.INT8, Type.INT16, Type.INT32, Type.INT64, Type.FLOAT32, Type.FLOAT64}; - for (int i = 0; i < promotableTypes.length; ++i) { - for (int j = i; j < promotableTypes.length; ++j) { - promotable.add(new AbstractMap.SimpleImmutableEntry<>(promotableTypes[i], promotableTypes[j])); - } - } - } - - /** - * This method project a value between compatible schemas and throw exceptions when non compatible schemas are provided - * @param source the schema used to construct the record - * @param record the value to project from source schema to target schema - * @param target the schema to project the record to - * @return the projected value with target schema - * @throws SchemaProjectorException - */ - public static Object project(Schema source, Object record, Schema target) throws SchemaProjectorException { - checkMaybeCompatible(source, target); - if (source.isOptional() && !target.isOptional()) { - if (target.defaultValue() != null) { - if (record != null) { - return projectRequiredSchema(source, record, target); - } else { - return target.defaultValue(); - } - } else { - throw new SchemaProjectorException("Writer schema is optional, however, target schema does not provide a default value."); - } - } else { - if (record != null) { - return projectRequiredSchema(source, record, target); - } else { - return null; - } - } - } - - private static Object projectRequiredSchema(Schema source, Object record, Schema target) throws SchemaProjectorException { - switch (target.type()) { - case INT8: - case INT16: - case INT32: - case INT64: - case FLOAT32: - case FLOAT64: - case BOOLEAN: - case BYTES: - case STRING: - return projectPrimitive(source, record, target); - case STRUCT: - return projectStruct(source, (Struct) record, target); - case ARRAY: - return projectArray(source, record, target); - case MAP: - return projectMap(source, record, target); - } - return null; - } - - private static Object projectStruct(Schema source, Struct sourceStruct, Schema target) throws SchemaProjectorException { - Struct targetStruct = new Struct(target); - for (Field targetField : target.fields()) { - String fieldName = targetField.name(); - Field sourceField = source.field(fieldName); - if (sourceField != null) { - Object sourceFieldValue = sourceStruct.get(fieldName); - try { - Object targetFieldValue = project(sourceField.schema(), sourceFieldValue, targetField.schema()); - targetStruct.put(fieldName, targetFieldValue); - } catch (SchemaProjectorException e) { - throw new SchemaProjectorException("Error projecting " + sourceField.name(), e); - } - } else { - Object targetDefault; - if (targetField.schema().defaultValue() != null) { - targetDefault = targetField.schema().defaultValue(); - } else { - throw new SchemaProjectorException("Cannot project " + source.schema() + " to " + target.schema()); - } - targetStruct.put(fieldName, targetDefault); - } - } - return targetStruct; - } - - - private static void checkMaybeCompatible(Schema source, Schema target) { - if (source.type() != target.type() && !isPromotable(source.type(), target.type())) { - throw new SchemaProjectorException("Schema type mismatch. source type: " + source.type() + " and target type: " + target.type()); - } else if (!Objects.equals(source.name(), target.name())) { - throw new SchemaProjectorException("Schema name mismatch. source name: " + source.name() + " and target name: " + target.name()); - } else if (!Objects.equals(source.parameters(), target.parameters())) { - throw new SchemaProjectorException("Schema parameters not equal. source parameters: " + source.parameters() + " and target parameters: " + target.parameters()); - } - } - - private static Object projectArray(Schema source, Object record, Schema target) throws SchemaProjectorException { - List<?> array = (List<?>) record; - List<Object> retArray = new ArrayList<>(); - for (Object entry : array) { - retArray.add(project(source.valueSchema(), entry, target.valueSchema())); - } - return retArray; - } - - private static Object projectMap(Schema source, Object record, Schema target) throws SchemaProjectorException { - Map<?, ?> map = (Map<?, ?>) record; - Map<Object, Object> retMap = new HashMap<>(); - for (Map.Entry<?, ?> entry : map.entrySet()) { - Object key = entry.getKey(); - Object value = entry.getValue(); - Object retKey = project(source.keySchema(), key, target.keySchema()); - Object retValue = project(source.valueSchema(), value, target.valueSchema()); - retMap.put(retKey, retValue); - } - return retMap; - } - - private static Object projectPrimitive(Schema source, Object record, Schema target) throws SchemaProjectorException { - assert source.type().isPrimitive(); - assert target.type().isPrimitive(); - Object result; - if (isPromotable(source.type(), target.type())) { - Number numberRecord = (Number) record; - switch (target.type()) { - case INT8: - result = numberRecord.byteValue(); - break; - case INT16: - result = numberRecord.shortValue(); - break; - case INT32: - result = numberRecord.intValue(); - break; - case INT64: - result = numberRecord.longValue(); - break; - case FLOAT32: - result = numberRecord.floatValue(); - break; - case FLOAT64: - result = numberRecord.doubleValue(); - break; - default: - throw new SchemaProjectorException("Not promotable type."); - } - } else { - result = record; - } - return result; - } - - private static boolean isPromotable(Type sourceType, Type targetType) { - return promotable.contains(new AbstractMap.SimpleImmutableEntry<>(sourceType, targetType)); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java deleted file mode 100644 index bd757c4..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Struct.java +++ /dev/null @@ -1,265 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; - -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.Objects; - -/** - * <p> - * A structured record containing a set of named fields with values, each field using an independent {@link Schema}. - * Struct objects must specify a complete {@link Schema} up front, and only fields specified in the Schema may be set. - * </p> - * <p> - * The Struct's {@link #put(String, Object)} method returns the Struct itself to provide a fluent API for constructing - * complete objects: - * <pre> - * Schema schema = SchemaBuilder.struct().name("com.example.Person") - * .field("name", Schema.STRING_SCHEMA).field("age", Schema.INT32_SCHEMA).build() - * Struct struct = new Struct(schema).put("name", "Bobby McGee").put("age", 21) - * </pre> - * </p> - */ -public class Struct { - - private final Schema schema; - private final Object[] values; - - /** - * Create a new Struct for this {@link Schema} - * @param schema the {@link Schema} for the Struct - */ - public Struct(Schema schema) { - if (schema.type() != Schema.Type.STRUCT) - throw new DataException("Not a struct schema: " + schema); - this.schema = schema; - this.values = new Object[schema.fields().size()]; - } - - /** - * Get the schema for this Struct. - * @return the Struct's schema - */ - public Schema schema() { - return schema; - } - - /** - * Get the value of a field, returning the default value if no value has been set yet and a default value is specified - * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and - * must be cast to a more specific type. - * @param fieldName the field name to lookup - * @return the value for the field - */ - public Object get(String fieldName) { - Field field = lookupField(fieldName); - return get(field); - } - - /** - * Get the value of a field, returning the default value if no value has been set yet and a default value is specified - * in the field's schema. Because this handles fields of all types, the value is returned as an {@link Object} and - * must be cast to a more specific type. - * @param field the field to lookup - * @return the value for the field - */ - public Object get(Field field) { - Object val = values[field.index()]; - if (val == null && schema.defaultValue() != null) { - val = schema.defaultValue(); - } - return val; - } - - /** - * Get the underlying raw value for the field without accounting for default values. - * @param fieldName the field to get the value of - * @return the raw value - */ - public Object getWithoutDefault(String fieldName) { - Field field = lookupField(fieldName); - return values[field.index()]; - } - - // Note that all getters have to have boxed return types since the fields might be optional - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a Byte. - */ - public Byte getInt8(String fieldName) { - return (Byte) getCheckType(fieldName, Schema.Type.INT8); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a Short. - */ - public Short getInt16(String fieldName) { - return (Short) getCheckType(fieldName, Schema.Type.INT16); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a Integer. - */ - public Integer getInt32(String fieldName) { - return (Integer) getCheckType(fieldName, Schema.Type.INT32); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a Long. - */ - public Long getInt64(String fieldName) { - return (Long) getCheckType(fieldName, Schema.Type.INT64); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a Float. - */ - public Float getFloat32(String fieldName) { - return (Float) getCheckType(fieldName, Schema.Type.FLOAT32); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a Double. - */ - public Double getFloat64(String fieldName) { - return (Double) getCheckType(fieldName, Schema.Type.FLOAT64); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a Boolean. - */ - public Boolean getBoolean(String fieldName) { - return (Boolean) getCheckType(fieldName, Schema.Type.BOOLEAN); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a String. - */ - public String getString(String fieldName) { - return (String) getCheckType(fieldName, Schema.Type.STRING); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a byte[]. - */ - public byte[] getBytes(String fieldName) { - Object bytes = getCheckType(fieldName, Schema.Type.BYTES); - if (bytes instanceof ByteBuffer) - return ((ByteBuffer) bytes).array(); - return (byte[]) bytes; - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a List. - */ - public <T> List<T> getArray(String fieldName) { - return (List<T>) getCheckType(fieldName, Schema.Type.ARRAY); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a Map. - */ - public <K, V> Map<K, V> getMap(String fieldName) { - return (Map<K, V>) getCheckType(fieldName, Schema.Type.MAP); - } - - /** - * Equivalent to calling {@link #get(String)} and casting the result to a Struct. - */ - public Struct getStruct(String fieldName) { - return (Struct) getCheckType(fieldName, Schema.Type.STRUCT); - } - - /** - * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's - * {@link Schema}. - * @param fieldName the name of the field to set - * @param value the value of the field - * @return the Struct, to allow chaining of {@link #put(String, Object)} calls - */ - public Struct put(String fieldName, Object value) { - Field field = lookupField(fieldName); - return put(field, value); - } - - /** - * Set the value of a field. Validates the value, throwing a {@link DataException} if it does not match the field's - * {@link Schema}. - * @param field the field to set - * @param value the value of the field - * @return the Struct, to allow chaining of {@link #put(String, Object)} calls - */ - public Struct put(Field field, Object value) { - CopycatSchema.validateValue(field.schema(), value); - values[field.index()] = value; - return this; - } - - - /** - * Validates that this struct has filled in all the necessary data with valid values. For required fields - * without defaults, this validates that a value has been set and has matching types/schemas. If any validation - * fails, throws a DataException. - */ - public void validate() { - for (Field field : schema.fields()) { - Schema fieldSchema = field.schema(); - Object value = values[field.index()]; - if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null)) - continue; - CopycatSchema.validateValue(fieldSchema, value); - } - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Struct struct = (Struct) o; - return Objects.equals(schema, struct.schema) && - Arrays.equals(values, struct.values); - } - - @Override - public int hashCode() { - return Objects.hash(schema, Arrays.hashCode(values)); - } - - private Field lookupField(String fieldName) { - Field field = schema.field(fieldName); - if (field == null) - throw new DataException(fieldName + " is not a valid field name"); - return field; - } - - // Get the field's value, but also check that the field matches the specified type, throwing an exception if it doesn't. - // Used to implement the get*() methods that return typed data instead of Object - private Object getCheckType(String fieldName, Schema.Type type) { - Field field = lookupField(fieldName); - if (field.schema().type() != type) - throw new DataException("Field '" + fieldName + "' is not of type " + type); - return values[field.index()]; - } - -} - http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java deleted file mode 100644 index e3255e0..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; - -import java.util.Calendar; -import java.util.TimeZone; - -/** - * <p> - * A time representing a specific point in a day, not tied to any specific date. The corresponding Java type is a - * java.util.Date where only hours, minutes, seconds, and milliseconds can be non-zero. This effectively makes it a - * point in time during the first day after the Unix epoch. The underlying representation is an integer - * representing the number of milliseconds after midnight. - * </p> - */ -public class Time { - public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Time"; - - private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; - - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - - /** - * Returns a SchemaBuilder for a Time. By returning a SchemaBuilder you can override additional schema settings such - * as required/optional, default value, and documentation. - * @return a SchemaBuilder - */ - public static SchemaBuilder builder() { - return SchemaBuilder.int32() - .name(LOGICAL_NAME) - .version(1); - } - - public static final Schema SCHEMA = builder().schema(); - - /** - * Convert a value from its logical format (Time) to it's encoded format. - * @param value the logical value - * @return the encoded value - */ - public static int fromLogical(Schema schema, java.util.Date value) { - if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) - throw new DataException("Requested conversion of Time object but the schema does not match."); - Calendar calendar = Calendar.getInstance(UTC); - calendar.setTime(value); - long unixMillis = calendar.getTimeInMillis(); - if (unixMillis < 0 || unixMillis > MILLIS_PER_DAY) { - throw new DataException("Copycat Time type should not have any date fields set to non-zero values."); - } - return (int) unixMillis; - } - - public static java.util.Date toLogical(Schema schema, int value) { - if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) - throw new DataException("Requested conversion of Date object but the schema does not match."); - if (value < 0 || value > MILLIS_PER_DAY) - throw new DataException("Time values must use number of milliseconds greater than 0 and less than 86400000"); - return new java.util.Date(value); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java deleted file mode 100644 index 62d371c..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Timestamp.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.data; - -import org.apache.kafka.copycat.errors.DataException; - -import java.util.TimeZone; - -/** - * <p> - * A timestamp representing an absolute time, without timezone information. The corresponding Java type is a - * java.util.Date. The underlying representation is a long representing the number of milliseconds since Unix epoch. - * </p> - */ -public class Timestamp { - public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Timestamp"; - - private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); - - /** - * Returns a SchemaBuilder for a Timestamp. By returning a SchemaBuilder you can override additional schema settings such - * as required/optional, default value, and documentation. - * @return a SchemaBuilder - */ - public static SchemaBuilder builder() { - return SchemaBuilder.int64() - .name(LOGICAL_NAME) - .version(1); - } - - public static final Schema SCHEMA = builder().schema(); - - /** - * Convert a value from its logical format (Date) to it's encoded format. - * @param value the logical value - * @return the encoded value - */ - public static long fromLogical(Schema schema, java.util.Date value) { - if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) - throw new DataException("Requested conversion of Timestamp object but the schema does not match."); - return value.getTime(); - } - - public static java.util.Date toLogical(Schema schema, long value) { - if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) - throw new DataException("Requested conversion of Timestamp object but the schema does not match."); - return new java.util.Date(value); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java deleted file mode 100644 index c8f1bad..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/CopycatException.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.errors; - -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.annotation.InterfaceStability; - -/** - * CopycatException is the top-level exception type generated by Copycat and connectors. - */ -@InterfaceStability.Unstable -public class CopycatException extends KafkaException { - - public CopycatException(String s) { - super(s); - } - - public CopycatException(String s, Throwable throwable) { - super(s, throwable); - } - - public CopycatException(Throwable throwable) { - super(throwable); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java deleted file mode 100644 index 11139a4..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/DataException.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.errors; - -/** - * Base class for all Copycat data API exceptions. - */ -public class DataException extends CopycatException { - public DataException(String s) { - super(s); - } - - public DataException(String s, Throwable throwable) { - super(s, throwable); - } - - public DataException(Throwable throwable) { - super(throwable); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java deleted file mode 100644 index 6f9f233..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/IllegalWorkerStateException.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.errors; - -/** - * Indicates that a method has been invoked illegally or at an invalid time by a connector or task. - */ -public class IllegalWorkerStateException extends CopycatException { - public IllegalWorkerStateException(String s) { - super(s); - } - - public IllegalWorkerStateException(String s, Throwable throwable) { - super(s, throwable); - } - - public IllegalWorkerStateException(Throwable throwable) { - super(throwable); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java deleted file mode 100644 index b5a93af..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaBuilderException.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.errors; - -public class SchemaBuilderException extends DataException { - public SchemaBuilderException(String s) { - super(s); - } - - public SchemaBuilderException(String s, Throwable throwable) { - super(s, throwable); - } - - public SchemaBuilderException(Throwable throwable) { - super(throwable); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java b/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java deleted file mode 100644 index be21418..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/errors/SchemaProjectorException.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - **/ -package org.apache.kafka.copycat.errors; - -public class SchemaProjectorException extends DataException { - public SchemaProjectorException(String s) { - super(s); - } - - public SchemaProjectorException(String s, Throwable throwable) { - super(s, throwable); - } - - public SchemaProjectorException(Throwable throwable) { - super(throwable); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java deleted file mode 100644 index fb2e694..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkConnector.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.sink; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.connector.Connector; - -/** - * SinkConnectors implement the Connector interface to send Kafka data to another system. - */ -@InterfaceStability.Unstable -public abstract class SinkConnector extends Connector { - - /** - * <p> - * Configuration key for the list of input topics for this connector. - * </p> - * <p> - * Usually this setting is only relevant to the Copycat framework, but is provided here for - * the convenience of Connector developers if they also need to know the set of topics. - * </p> - */ - public static final String TOPICS_CONFIG = "topics"; - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java ---------------------------------------------------------------------- diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java deleted file mode 100644 index 79ac725..0000000 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/sink/SinkRecord.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.sink; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.copycat.connector.CopycatRecord; -import org.apache.kafka.copycat.data.Schema; - -/** - * SinkRecord is a CopycatRecord that has been read from Kafka and includes the kafkaOffset of - * the record in the Kafka topic-partition in addition to the standard fields. This information - * should be used by the SinkTask to coordinate kafkaOffset commits. - */ -@InterfaceStability.Unstable -public class SinkRecord extends CopycatRecord { - private final long kafkaOffset; - - public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset) { - super(topic, partition, keySchema, key, valueSchema, value); - this.kafkaOffset = kafkaOffset; - } - - public long kafkaOffset() { - return kafkaOffset; - } - - @Override - public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - if (!super.equals(o)) - return false; - - SinkRecord that = (SinkRecord) o; - - if (kafkaOffset != that.kafkaOffset) - return false; - - return true; - } - - @Override - public int hashCode() { - int result = super.hashCode(); - result = 31 * result + (int) (kafkaOffset ^ (kafkaOffset >>> 32)); - return result; - } - - @Override - public String toString() { - return "SinkRecord{" + - "kafkaOffset=" + kafkaOffset + - "} " + super.toString(); - } -}