http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java deleted file mode 100644 index ca8f029..0000000 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java +++ /dev/null @@ -1,735 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.json; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.kafka.common.cache.Cache; -import org.apache.kafka.common.cache.LRUCache; -import org.apache.kafka.common.cache.SynchronizedCache; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.copycat.data.*; -import org.apache.kafka.copycat.errors.DataException; -import org.apache.kafka.copycat.storage.Converter; - -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; - -/** - * Implementation of Converter that uses JSON to store schemas and objects. - */ -public class JsonConverter implements Converter { - private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable"; - private static final boolean SCHEMAS_ENABLE_DEFAULT = true; - private static final String SCHEMAS_CACHE_CONFIG = "schemas.cache.size"; - private static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000; - - private static final HashMap<Schema.Type, JsonToCopycatTypeConverter> TO_COPYCAT_CONVERTERS = new HashMap<>(); - - private static Object checkOptionalAndDefault(Schema schema) { - if (schema.defaultValue() != null) - return schema.defaultValue(); - if (schema.isOptional()) - return null; - throw new DataException("Invalid null value for required field"); - } - - static { - TO_COPYCAT_CONVERTERS.put(Schema.Type.BOOLEAN, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - return value.booleanValue(); - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.INT8, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - return (byte) value.intValue(); - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.INT16, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - return (short) value.intValue(); - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.INT32, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - return value.intValue(); - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.INT64, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - return value.longValue(); - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.FLOAT32, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - return value.floatValue(); - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.FLOAT64, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - return value.doubleValue(); - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.BYTES, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - try { - if (value.isNull()) return checkOptionalAndDefault(schema); - return value.binaryValue(); - } catch (IOException e) { - throw new DataException("Invalid bytes field", e); - } - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.STRING, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - return value.textValue(); - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.ARRAY, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - - Schema elemSchema = schema == null ? null : schema.valueSchema(); - ArrayList<Object> result = new ArrayList<>(); - for (JsonNode elem : value) { - result.add(convertToCopycat(elemSchema, elem)); - } - return result; - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.MAP, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - - Schema keySchema = schema == null ? null : schema.keySchema(); - Schema valueSchema = schema == null ? null : schema.valueSchema(); - - // If the map uses strings for keys, it should be encoded in the natural JSON format. If it uses other - // primitive types or a complex type as a key, it will be encoded as a list of pairs. If we don't have a - // schema, we default to encoding in a Map. - Map<Object, Object> result = new HashMap<>(); - if (schema == null || keySchema.type() == Schema.Type.STRING) { - if (!value.isObject()) - throw new DataException("Map's with string fields should be encoded as JSON objects, but found " + value.getNodeType()); - Iterator<Map.Entry<String, JsonNode>> fieldIt = value.fields(); - while (fieldIt.hasNext()) { - Map.Entry<String, JsonNode> entry = fieldIt.next(); - result.put(entry.getKey(), convertToCopycat(valueSchema, entry.getValue())); - } - } else { - if (!value.isArray()) - throw new DataException("Map's with non-string fields should be encoded as JSON array of tuples, but found " + value.getNodeType()); - for (JsonNode entry : value) { - if (!entry.isArray()) - throw new DataException("Found invalid map entry instead of array tuple: " + entry.getNodeType()); - if (entry.size() != 2) - throw new DataException("Found invalid map entry, expected length 2 but found :" + entry.size()); - result.put(convertToCopycat(keySchema, entry.get(0)), - convertToCopycat(valueSchema, entry.get(1))); - } - } - return result; - } - }); - TO_COPYCAT_CONVERTERS.put(Schema.Type.STRUCT, new JsonToCopycatTypeConverter() { - @Override - public Object convert(Schema schema, JsonNode value) { - if (value.isNull()) return checkOptionalAndDefault(schema); - - if (!value.isObject()) - throw new DataException("Structs should be encoded as JSON objects, but found " + value.getNodeType()); - - // We only have ISchema here but need Schema, so we need to materialize the actual schema. Using ISchema - // avoids having to materialize the schema for non-Struct types but it cannot be avoided for Structs since - // they require a schema to be provided at construction. However, the schema is only a SchemaBuilder during - // translation of schemas to JSON; during the more common translation of data to JSON, the call to schema.schema() - // just returns the schema Object and has no overhead. - Struct result = new Struct(schema.schema()); - for (Field field : schema.fields()) - result.put(field, convertToCopycat(field.schema(), value.get(field.name()))); - - return result; - } - }); - } - - // Convert values in Copycat form into their logical types. These logical converters are discovered by logical type - // names specified in the field - private static final HashMap<String, LogicalTypeConverter> TO_COPYCAT_LOGICAL_CONVERTERS = new HashMap<>(); - static { - TO_COPYCAT_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() { - @Override - public Object convert(Schema schema, Object value) { - if (!(value instanceof byte[])) - throw new DataException("Invalid type for Decimal, underlying representation should be bytes but was " + value.getClass()); - return Decimal.toLogical(schema, (byte[]) value); - } - }); - - TO_COPYCAT_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() { - @Override - public Object convert(Schema schema, Object value) { - if (!(value instanceof Integer)) - throw new DataException("Invalid type for Date, underlying representation should be int32 but was " + value.getClass()); - return Date.toLogical(schema, (int) value); - } - }); - - TO_COPYCAT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() { - @Override - public Object convert(Schema schema, Object value) { - if (!(value instanceof Integer)) - throw new DataException("Invalid type for Time, underlying representation should be int32 but was " + value.getClass()); - return Time.toLogical(schema, (int) value); - } - }); - - TO_COPYCAT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() { - @Override - public Object convert(Schema schema, Object value) { - if (!(value instanceof Long)) - throw new DataException("Invalid type for Timestamp, underlying representation should be int64 but was " + value.getClass()); - return Timestamp.toLogical(schema, (long) value); - } - }); - } - - private static final HashMap<String, LogicalTypeConverter> TO_JSON_LOGICAL_CONVERTERS = new HashMap<>(); - static { - TO_JSON_LOGICAL_CONVERTERS.put(Decimal.LOGICAL_NAME, new LogicalTypeConverter() { - @Override - public Object convert(Schema schema, Object value) { - if (!(value instanceof BigDecimal)) - throw new DataException("Invalid type for Decimal, expected BigDecimal but was " + value.getClass()); - return Decimal.fromLogical(schema, (BigDecimal) value); - } - }); - - TO_JSON_LOGICAL_CONVERTERS.put(Date.LOGICAL_NAME, new LogicalTypeConverter() { - @Override - public Object convert(Schema schema, Object value) { - if (!(value instanceof java.util.Date)) - throw new DataException("Invalid type for Date, expected Date but was " + value.getClass()); - return Date.fromLogical(schema, (java.util.Date) value); - } - }); - - TO_JSON_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() { - @Override - public Object convert(Schema schema, Object value) { - if (!(value instanceof java.util.Date)) - throw new DataException("Invalid type for Time, expected Date but was " + value.getClass()); - return Time.fromLogical(schema, (java.util.Date) value); - } - }); - - TO_JSON_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() { - @Override - public Object convert(Schema schema, Object value) { - if (!(value instanceof java.util.Date)) - throw new DataException("Invalid type for Timestamp, expected Date but was " + value.getClass()); - return Timestamp.fromLogical(schema, (java.util.Date) value); - } - }); - } - - - private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT; - private int cacheSize = SCHEMAS_CACHE_SIZE_DEFAULT; - private Cache<Schema, ObjectNode> fromCopycatSchemaCache; - private Cache<JsonNode, Schema> toCopycatSchemaCache; - - private final JsonSerializer serializer = new JsonSerializer(); - private final JsonDeserializer deserializer = new JsonDeserializer(); - - @Override - public void configure(Map<String, ?> configs, boolean isKey) { - Object enableConfigsVal = configs.get(SCHEMAS_ENABLE_CONFIG); - if (enableConfigsVal != null) - enableSchemas = enableConfigsVal.toString().equals("true"); - - serializer.configure(configs, isKey); - deserializer.configure(configs, isKey); - - Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_DEFAULT); - if (cacheSizeVal != null) - cacheSize = (int) cacheSizeVal; - fromCopycatSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize)); - toCopycatSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize)); - } - - @Override - public byte[] fromCopycatData(String topic, Schema schema, Object value) { - JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value); - try { - return serializer.serialize(topic, jsonValue); - } catch (SerializationException e) { - throw new DataException("Converting Copycat data to byte[] failed due to serialization error: ", e); - } - } - - @Override - public SchemaAndValue toCopycatData(String topic, byte[] value) { - JsonNode jsonValue; - try { - jsonValue = deserializer.deserialize(topic, value); - } catch (SerializationException e) { - throw new DataException("Converting byte[] to Copycat data failed due to serialization error: ", e); - } - - if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload"))) - throw new DataException("JsonDeserializer with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields"); - - // The deserialized data should either be an envelope object containing the schema and the payload or the schema - // was stripped during serialization and we need to fill in an all-encompassing schema. - if (!enableSchemas) { - ObjectNode envelope = JsonNodeFactory.instance.objectNode(); - envelope.set("schema", null); - envelope.set("payload", jsonValue); - jsonValue = envelope; - } - - return jsonToCopycat(jsonValue); - } - - private SchemaAndValue jsonToCopycat(JsonNode jsonValue) { - if (jsonValue == null) - return SchemaAndValue.NULL; - - if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)) - throw new DataException("JSON value converted to Copycat must be in envelope containing schema"); - - Schema schema = asCopycatSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - return new SchemaAndValue(schema, convertToCopycat(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))); - } - - private ObjectNode asJsonSchema(Schema schema) { - if (schema == null) - return null; - - ObjectNode cached = fromCopycatSchemaCache.get(schema); - if (cached != null) - return cached; - - final ObjectNode jsonSchema; - switch (schema.type()) { - case BOOLEAN: - jsonSchema = JsonSchema.BOOLEAN_SCHEMA.deepCopy(); - break; - case BYTES: - jsonSchema = JsonSchema.BYTES_SCHEMA.deepCopy(); - break; - case FLOAT64: - jsonSchema = JsonSchema.DOUBLE_SCHEMA.deepCopy(); - break; - case FLOAT32: - jsonSchema = JsonSchema.FLOAT_SCHEMA.deepCopy(); - break; - case INT8: - jsonSchema = JsonSchema.INT8_SCHEMA.deepCopy(); - break; - case INT16: - jsonSchema = JsonSchema.INT16_SCHEMA.deepCopy(); - break; - case INT32: - jsonSchema = JsonSchema.INT32_SCHEMA.deepCopy(); - break; - case INT64: - jsonSchema = JsonSchema.INT64_SCHEMA.deepCopy(); - break; - case STRING: - jsonSchema = JsonSchema.STRING_SCHEMA.deepCopy(); - break; - case ARRAY: - jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.ARRAY_TYPE_NAME); - jsonSchema.set(JsonSchema.ARRAY_ITEMS_FIELD_NAME, asJsonSchema(schema.valueSchema())); - break; - case MAP: - jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.MAP_TYPE_NAME); - jsonSchema.set(JsonSchema.MAP_KEY_FIELD_NAME, asJsonSchema(schema.keySchema())); - jsonSchema.set(JsonSchema.MAP_VALUE_FIELD_NAME, asJsonSchema(schema.valueSchema())); - break; - case STRUCT: - jsonSchema = JsonNodeFactory.instance.objectNode().put(JsonSchema.SCHEMA_TYPE_FIELD_NAME, JsonSchema.STRUCT_TYPE_NAME); - ArrayNode fields = JsonNodeFactory.instance.arrayNode(); - for (Field field : schema.fields()) { - ObjectNode fieldJsonSchema = asJsonSchema(field.schema()); - fieldJsonSchema.put(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME, field.name()); - fields.add(fieldJsonSchema); - } - jsonSchema.set(JsonSchema.STRUCT_FIELDS_FIELD_NAME, fields); - break; - default: - throw new DataException("Couldn't translate unsupported schema type " + schema + "."); - } - - jsonSchema.put(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME, schema.isOptional()); - if (schema.name() != null) - jsonSchema.put(JsonSchema.SCHEMA_NAME_FIELD_NAME, schema.name()); - if (schema.version() != null) - jsonSchema.put(JsonSchema.SCHEMA_VERSION_FIELD_NAME, schema.version()); - if (schema.doc() != null) - jsonSchema.put(JsonSchema.SCHEMA_DOC_FIELD_NAME, schema.doc()); - if (schema.parameters() != null) { - ObjectNode jsonSchemaParams = JsonNodeFactory.instance.objectNode(); - for (Map.Entry<String, String> prop : schema.parameters().entrySet()) - jsonSchemaParams.put(prop.getKey(), prop.getValue()); - jsonSchema.put(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME, jsonSchemaParams); - } - if (schema.defaultValue() != null) - jsonSchema.set(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME, convertToJson(schema, schema.defaultValue())); - - fromCopycatSchemaCache.put(schema, jsonSchema); - return jsonSchema; - } - - - private Schema asCopycatSchema(JsonNode jsonSchema) { - if (jsonSchema.isNull()) - return null; - - Schema cached = toCopycatSchemaCache.get(jsonSchema); - if (cached != null) - return cached; - - JsonNode schemaTypeNode = jsonSchema.get(JsonSchema.SCHEMA_TYPE_FIELD_NAME); - if (schemaTypeNode == null || !schemaTypeNode.isTextual()) - throw new DataException("Schema must contain 'type' field"); - - final SchemaBuilder builder; - switch (schemaTypeNode.textValue()) { - case JsonSchema.BOOLEAN_TYPE_NAME: - builder = SchemaBuilder.bool(); - break; - case JsonSchema.INT8_TYPE_NAME: - builder = SchemaBuilder.int8(); - break; - case JsonSchema.INT16_TYPE_NAME: - builder = SchemaBuilder.int16(); - break; - case JsonSchema.INT32_TYPE_NAME: - builder = SchemaBuilder.int32(); - break; - case JsonSchema.INT64_TYPE_NAME: - builder = SchemaBuilder.int64(); - break; - case JsonSchema.FLOAT_TYPE_NAME: - builder = SchemaBuilder.float32(); - break; - case JsonSchema.DOUBLE_TYPE_NAME: - builder = SchemaBuilder.float64(); - break; - case JsonSchema.BYTES_TYPE_NAME: - builder = SchemaBuilder.bytes(); - break; - case JsonSchema.STRING_TYPE_NAME: - builder = SchemaBuilder.string(); - break; - case JsonSchema.ARRAY_TYPE_NAME: - JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME); - if (elemSchema == null) - throw new DataException("Array schema did not specify the element type"); - builder = SchemaBuilder.array(asCopycatSchema(elemSchema)); - break; - case JsonSchema.MAP_TYPE_NAME: - JsonNode keySchema = jsonSchema.get(JsonSchema.MAP_KEY_FIELD_NAME); - if (keySchema == null) - throw new DataException("Map schema did not specify the key type"); - JsonNode valueSchema = jsonSchema.get(JsonSchema.MAP_VALUE_FIELD_NAME); - if (valueSchema == null) - throw new DataException("Map schema did not specify the value type"); - builder = SchemaBuilder.map(asCopycatSchema(keySchema), asCopycatSchema(valueSchema)); - break; - case JsonSchema.STRUCT_TYPE_NAME: - builder = SchemaBuilder.struct(); - JsonNode fields = jsonSchema.get(JsonSchema.STRUCT_FIELDS_FIELD_NAME); - if (fields == null || !fields.isArray()) - throw new DataException("Struct schema's \"fields\" argument is not an array."); - for (JsonNode field : fields) { - JsonNode jsonFieldName = field.get(JsonSchema.STRUCT_FIELD_NAME_FIELD_NAME); - if (jsonFieldName == null || !jsonFieldName.isTextual()) - throw new DataException("Struct schema's field name not specified properly"); - builder.field(jsonFieldName.asText(), asCopycatSchema(field)); - } - break; - default: - throw new DataException("Unknown schema type: " + schemaTypeNode.textValue()); - } - - - JsonNode schemaOptionalNode = jsonSchema.get(JsonSchema.SCHEMA_OPTIONAL_FIELD_NAME); - if (schemaOptionalNode != null && schemaOptionalNode.isBoolean() && schemaOptionalNode.booleanValue()) - builder.optional(); - else - builder.required(); - - JsonNode schemaNameNode = jsonSchema.get(JsonSchema.SCHEMA_NAME_FIELD_NAME); - if (schemaNameNode != null && schemaNameNode.isTextual()) - builder.name(schemaNameNode.textValue()); - - JsonNode schemaVersionNode = jsonSchema.get(JsonSchema.SCHEMA_VERSION_FIELD_NAME); - if (schemaVersionNode != null && schemaVersionNode.isIntegralNumber()) { - builder.version(schemaVersionNode.intValue()); - } - - JsonNode schemaDocNode = jsonSchema.get(JsonSchema.SCHEMA_DOC_FIELD_NAME); - if (schemaDocNode != null && schemaDocNode.isTextual()) - builder.doc(schemaDocNode.textValue()); - - JsonNode schemaParamsNode = jsonSchema.get(JsonSchema.SCHEMA_PARAMETERS_FIELD_NAME); - if (schemaParamsNode != null && schemaParamsNode.isObject()) { - Iterator<Map.Entry<String, JsonNode>> paramsIt = schemaParamsNode.fields(); - while (paramsIt.hasNext()) { - Map.Entry<String, JsonNode> entry = paramsIt.next(); - JsonNode paramValue = entry.getValue(); - if (!paramValue.isTextual()) - throw new DataException("Schema parameters must have string values."); - builder.parameter(entry.getKey(), paramValue.textValue()); - } - } - - JsonNode schemaDefaultNode = jsonSchema.get(JsonSchema.SCHEMA_DEFAULT_FIELD_NAME); - if (schemaDefaultNode != null) - builder.defaultValue(convertToCopycat(builder, schemaDefaultNode)); - - Schema result = builder.build(); - toCopycatSchemaCache.put(jsonSchema, result); - return result; - } - - - /** - * Convert this object, in org.apache.kafka.copycat.data format, into a JSON object with an envelope object - * containing schema and payload fields. - * @param schema the schema for the data - * @param value the value - * @return JsonNode-encoded version - */ - private JsonNode convertToJsonWithEnvelope(Schema schema, Object value) { - return new JsonSchema.Envelope(asJsonSchema(schema), convertToJson(schema, value)).toJsonNode(); - } - - private JsonNode convertToJsonWithoutEnvelope(Schema schema, Object value) { - return convertToJson(schema, value); - } - - /** - * Convert this object, in the org.apache.kafka.copycat.data format, into a JSON object, returning both the schema - * and the converted object. - */ - private static JsonNode convertToJson(Schema schema, Object logicalValue) { - if (logicalValue == null) { - if (schema == null) // Any schema is valid and we don't have a default, so treat this as an optional schema - return null; - if (schema.defaultValue() != null) - return convertToJson(schema, schema.defaultValue()); - if (schema.isOptional()) - return JsonNodeFactory.instance.nullNode(); - throw new DataException("Conversion error: null value for field that is required and has no default value"); - } - - Object value = logicalValue; - if (schema != null && schema.name() != null) { - LogicalTypeConverter logicalConverter = TO_JSON_LOGICAL_CONVERTERS.get(schema.name()); - if (logicalConverter != null) - value = logicalConverter.convert(schema, logicalValue); - } - - try { - final Schema.Type schemaType; - if (schema == null) { - schemaType = CopycatSchema.schemaType(value.getClass()); - if (schemaType == null) - throw new DataException("Java class " + value.getClass() + " does not have corresponding schema type."); - } else { - schemaType = schema.type(); - } - switch (schemaType) { - case INT8: - return JsonNodeFactory.instance.numberNode((Byte) value); - case INT16: - return JsonNodeFactory.instance.numberNode((Short) value); - case INT32: - return JsonNodeFactory.instance.numberNode((Integer) value); - case INT64: - return JsonNodeFactory.instance.numberNode((Long) value); - case FLOAT32: - return JsonNodeFactory.instance.numberNode((Float) value); - case FLOAT64: - return JsonNodeFactory.instance.numberNode((Double) value); - case BOOLEAN: - return JsonNodeFactory.instance.booleanNode((Boolean) value); - case STRING: - CharSequence charSeq = (CharSequence) value; - return JsonNodeFactory.instance.textNode(charSeq.toString()); - case BYTES: - if (value instanceof byte[]) - return JsonNodeFactory.instance.binaryNode((byte[]) value); - else if (value instanceof ByteBuffer) - return JsonNodeFactory.instance.binaryNode(((ByteBuffer) value).array()); - else - throw new DataException("Invalid type for bytes type: " + value.getClass()); - case ARRAY: { - Collection collection = (Collection) value; - ArrayNode list = JsonNodeFactory.instance.arrayNode(); - for (Object elem : collection) { - Schema valueSchema = schema == null ? null : schema.valueSchema(); - JsonNode fieldValue = convertToJson(valueSchema, elem); - list.add(fieldValue); - } - return list; - } - case MAP: { - Map<?, ?> map = (Map<?, ?>) value; - // If true, using string keys and JSON object; if false, using non-string keys and Array-encoding - boolean objectMode; - if (schema == null) { - objectMode = true; - for (Map.Entry<?, ?> entry : map.entrySet()) { - if (!(entry.getKey() instanceof String)) { - objectMode = false; - break; - } - } - } else { - objectMode = schema.keySchema().type() == Schema.Type.STRING; - } - ObjectNode obj = null; - ArrayNode list = null; - if (objectMode) - obj = JsonNodeFactory.instance.objectNode(); - else - list = JsonNodeFactory.instance.arrayNode(); - for (Map.Entry<?, ?> entry : map.entrySet()) { - Schema keySchema = schema == null ? null : schema.keySchema(); - Schema valueSchema = schema == null ? null : schema.valueSchema(); - JsonNode mapKey = convertToJson(keySchema, entry.getKey()); - JsonNode mapValue = convertToJson(valueSchema, entry.getValue()); - - if (objectMode) - obj.set(mapKey.asText(), mapValue); - else - list.add(JsonNodeFactory.instance.arrayNode().add(mapKey).add(mapValue)); - } - return objectMode ? obj : list; - } - case STRUCT: { - Struct struct = (Struct) value; - if (struct.schema() != schema) - throw new DataException("Mismatching schema."); - ObjectNode obj = JsonNodeFactory.instance.objectNode(); - for (Field field : schema.fields()) { - obj.set(field.name(), convertToJson(field.schema(), struct.get(field))); - } - return obj; - } - } - - throw new DataException("Couldn't convert " + value + " to JSON."); - } catch (ClassCastException e) { - throw new DataException("Invalid type for " + schema.type() + ": " + value.getClass()); - } - } - - - private static Object convertToCopycat(Schema schema, JsonNode jsonValue) { - JsonToCopycatTypeConverter typeConverter; - final Schema.Type schemaType; - if (schema != null) { - schemaType = schema.type(); - } else { - switch (jsonValue.getNodeType()) { - case NULL: - // Special case. With no schema - return null; - case BOOLEAN: - schemaType = Schema.Type.BOOLEAN; - break; - case NUMBER: - if (jsonValue.isIntegralNumber()) - schemaType = Schema.Type.INT64; - else - schemaType = Schema.Type.FLOAT64; - break; - case ARRAY: - schemaType = Schema.Type.ARRAY; - break; - case OBJECT: - schemaType = Schema.Type.MAP; - break; - case STRING: - schemaType = Schema.Type.STRING; - break; - - case BINARY: - case MISSING: - case POJO: - default: - schemaType = null; - break; - } - } - typeConverter = TO_COPYCAT_CONVERTERS.get(schemaType); - if (typeConverter == null) - throw new DataException("Unknown schema type: " + schema.type()); - - Object converted = typeConverter.convert(schema, jsonValue); - if (schema != null && schema.name() != null) { - LogicalTypeConverter logicalConverter = TO_COPYCAT_LOGICAL_CONVERTERS.get(schema.name()); - if (logicalConverter != null) - converted = logicalConverter.convert(schema, converted); - } - return converted; - } - - - private interface JsonToCopycatTypeConverter { - Object convert(Schema schema, JsonNode value); - } - - private interface LogicalTypeConverter { - Object convert(Schema schema, Object value); - } -}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java deleted file mode 100644 index 1661754..0000000 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonDeserializer.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.copycat.json; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; - -import java.util.Map; - -/** - * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily - * structured data without having associated Java classes. This deserializer also supports Copycat schemas. - */ -public class JsonDeserializer implements Deserializer<JsonNode> { - private ObjectMapper objectMapper = new ObjectMapper(); - - /** - * Default constructor needed by Kafka - */ - public JsonDeserializer() { - } - - @Override - public void configure(Map<String, ?> props, boolean isKey) { - } - - @Override - public JsonNode deserialize(String topic, byte[] bytes) { - if (bytes == null) - return null; - - JsonNode data; - try { - data = objectMapper.readTree(bytes); - } catch (Exception e) { - throw new SerializationException(e); - } - - return data; - } - - @Override - public void close() { - - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java deleted file mode 100644 index 78712f3..0000000 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSchema.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.json; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; - -public class JsonSchema { - - static final String ENVELOPE_SCHEMA_FIELD_NAME = "schema"; - static final String ENVELOPE_PAYLOAD_FIELD_NAME = "payload"; - static final String SCHEMA_TYPE_FIELD_NAME = "type"; - static final String SCHEMA_OPTIONAL_FIELD_NAME = "optional"; - static final String SCHEMA_NAME_FIELD_NAME = "name"; - static final String SCHEMA_VERSION_FIELD_NAME = "version"; - static final String SCHEMA_DOC_FIELD_NAME = "doc"; - static final String SCHEMA_PARAMETERS_FIELD_NAME = "parameters"; - static final String SCHEMA_DEFAULT_FIELD_NAME = "default"; - static final String ARRAY_ITEMS_FIELD_NAME = "items"; - static final String MAP_KEY_FIELD_NAME = "keys"; - static final String MAP_VALUE_FIELD_NAME = "values"; - static final String STRUCT_FIELDS_FIELD_NAME = "fields"; - static final String STRUCT_FIELD_NAME_FIELD_NAME = "field"; - static final String BOOLEAN_TYPE_NAME = "boolean"; - static final ObjectNode BOOLEAN_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BOOLEAN_TYPE_NAME); - static final String INT8_TYPE_NAME = "int8"; - static final ObjectNode INT8_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT8_TYPE_NAME); - static final String INT16_TYPE_NAME = "int16"; - static final ObjectNode INT16_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT16_TYPE_NAME); - static final String INT32_TYPE_NAME = "int32"; - static final ObjectNode INT32_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT32_TYPE_NAME); - static final String INT64_TYPE_NAME = "int64"; - static final ObjectNode INT64_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, INT64_TYPE_NAME); - static final String FLOAT_TYPE_NAME = "float"; - static final ObjectNode FLOAT_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, FLOAT_TYPE_NAME); - static final String DOUBLE_TYPE_NAME = "double"; - static final ObjectNode DOUBLE_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, DOUBLE_TYPE_NAME); - static final String BYTES_TYPE_NAME = "bytes"; - static final ObjectNode BYTES_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, BYTES_TYPE_NAME); - static final String STRING_TYPE_NAME = "string"; - static final ObjectNode STRING_SCHEMA = JsonNodeFactory.instance.objectNode().put(SCHEMA_TYPE_FIELD_NAME, STRING_TYPE_NAME); - static final String ARRAY_TYPE_NAME = "array"; - static final String MAP_TYPE_NAME = "map"; - static final String STRUCT_TYPE_NAME = "struct"; - - public static ObjectNode envelope(JsonNode schema, JsonNode payload) { - ObjectNode result = JsonNodeFactory.instance.objectNode(); - result.set(ENVELOPE_SCHEMA_FIELD_NAME, schema); - result.set(ENVELOPE_PAYLOAD_FIELD_NAME, payload); - return result; - } - - static class Envelope { - public JsonNode schema; - public JsonNode payload; - - public Envelope(JsonNode schema, JsonNode payload) { - this.schema = schema; - this.payload = payload; - } - - public ObjectNode toJsonNode() { - return envelope(schema, payload); - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java deleted file mode 100644 index 129d14b..0000000 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonSerializer.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.copycat.json; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Serializer; - -import java.util.Map; - -/** - * Serialize Jackson JsonNode tree model objects to UTF-8 JSON. Using the tree model allows handling arbitrarily - * structured data without corresponding Java classes. This serializer also supports Copycat schemas. - */ -public class JsonSerializer implements Serializer<JsonNode> { - private final ObjectMapper objectMapper = new ObjectMapper(); - - /** - * Default constructor needed by Kafka - */ - public JsonSerializer() { - - } - - @Override - public void configure(Map<String, ?> config, boolean isKey) { - } - - @Override - public byte[] serialize(String topic, JsonNode data) { - if (data == null) - return null; - - try { - return objectMapper.writeValueAsBytes(data); - } catch (Exception e) { - throw new SerializationException("Error serializing JSON message", e); - } - } - - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java ---------------------------------------------------------------------- diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java deleted file mode 100644 index 6b40046..0000000 --- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java +++ /dev/null @@ -1,644 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.json; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import org.apache.kafka.copycat.data.Date; -import org.apache.kafka.copycat.data.Decimal; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.kafka.common.cache.Cache; -import org.apache.kafka.copycat.data.Schema; -import org.apache.kafka.copycat.data.SchemaAndValue; -import org.apache.kafka.copycat.data.SchemaBuilder; -import org.apache.kafka.copycat.data.Struct; -import org.apache.kafka.copycat.data.Time; -import org.apache.kafka.copycat.data.Timestamp; -import org.apache.kafka.copycat.errors.DataException; -import org.junit.Before; -import org.junit.Test; -import org.powermock.reflect.Whitebox; - -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Collections; -import java.util.GregorianCalendar; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -public class JsonConverterTest { - private static final String TOPIC = "topic"; - - ObjectMapper objectMapper = new ObjectMapper(); - JsonConverter converter = new JsonConverter(); - - @Before - public void setUp() { - converter.configure(Collections.EMPTY_MAP, false); - } - - // Schema metadata - - @Test - public void testCopycatSchemaMetadataTranslation() { - // this validates the non-type fields are translated and handled properly - assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes())); - assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }".getBytes())); - assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true), - converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }".getBytes())); - assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").parameter("foo", "bar").build(), true), - converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\", \"parameters\": { \"foo\": \"bar\" }}, \"payload\": true }".getBytes())); - } - - // Schema types - - @Test - public void booleanToCopycat() { - assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes())); - assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, false), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": false }".getBytes())); - } - - @Test - public void byteToCopycat() { - assertEquals(new SchemaAndValue(Schema.INT8_SCHEMA, (byte) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int8\" }, \"payload\": 12 }".getBytes())); - } - - @Test - public void shortToCopycat() { - assertEquals(new SchemaAndValue(Schema.INT16_SCHEMA, (short) 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int16\" }, \"payload\": 12 }".getBytes())); - } - - @Test - public void intToCopycat() { - assertEquals(new SchemaAndValue(Schema.INT32_SCHEMA, 12), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int32\" }, \"payload\": 12 }".getBytes())); - } - - @Test - public void longToCopycat() { - assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 12L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 12 }".getBytes())); - assertEquals(new SchemaAndValue(Schema.INT64_SCHEMA, 4398046511104L), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"int64\" }, \"payload\": 4398046511104 }".getBytes())); - } - - @Test - public void floatToCopycat() { - assertEquals(new SchemaAndValue(Schema.FLOAT32_SCHEMA, 12.34f), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"float\" }, \"payload\": 12.34 }".getBytes())); - } - - @Test - public void doubleToCopycat() { - assertEquals(new SchemaAndValue(Schema.FLOAT64_SCHEMA, 12.34), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"double\" }, \"payload\": 12.34 }".getBytes())); - } - - - @Test - public void bytesToCopycat() throws UnsupportedEncodingException { - ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8")); - String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }"; - SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes()); - ByteBuffer converted = ByteBuffer.wrap((byte[]) schemaAndValue.value()); - assertEquals(reference, converted); - } - - @Test - public void stringToCopycat() { - assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes())); - } - - @Test - public void arrayToCopycat() { - byte[] arrayJson = "{ \"schema\": { \"type\": \"array\", \"items\": { \"type\" : \"int32\" } }, \"payload\": [1, 2, 3] }".getBytes(); - assertEquals(new SchemaAndValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3)), converter.toCopycatData(TOPIC, arrayJson)); - } - - @Test - public void mapToCopycatStringKeys() { - byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }".getBytes(); - Map<String, Integer> expected = new HashMap<>(); - expected.put("key1", 12); - expected.put("key2", 15); - assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson)); - } - - @Test - public void mapToCopycatNonStringKeys() { - byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }".getBytes(); - Map<Integer, Integer> expected = new HashMap<>(); - expected.put(1, 12); - expected.put(2, 15); - assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toCopycatData(TOPIC, mapJson)); - } - - @Test - public void structToCopycat() { - byte[] structJson = "{ \"schema\": { \"type\": \"struct\", \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\" }, { \"field\": \"field2\", \"type\": \"string\" }] }, \"payload\": { \"field1\": true, \"field2\": \"string\" } }".getBytes(); - Schema expectedSchema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build(); - Struct expected = new Struct(expectedSchema).put("field1", true).put("field2", "string"); - SchemaAndValue converted = converter.toCopycatData(TOPIC, structJson); - assertEquals(new SchemaAndValue(expectedSchema, expected), converted); - } - - @Test(expected = DataException.class) - public void nullToCopycat() { - // When schemas are enabled, trying to decode a null should be an error -- we should *always* have the envelope - assertEquals(SchemaAndValue.NULL, converter.toCopycatData(TOPIC, null)); - } - - @Test - public void nullSchemaPrimitiveToCopycat() { - SchemaAndValue converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": null }".getBytes()); - assertEquals(SchemaAndValue.NULL, converted); - - converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": true }".getBytes()); - assertEquals(new SchemaAndValue(null, true), converted); - - // Integers: Copycat has more data types, and JSON unfortunately mixes all number types. We try to preserve - // info as best we can, so we always use the largest integer and floating point numbers we can and have Jackson - // determine if it's an integer or not - converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12 }".getBytes()); - assertEquals(new SchemaAndValue(null, 12L), converted); - - converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": 12.24 }".getBytes()); - assertEquals(new SchemaAndValue(null, 12.24), converted); - - converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": \"a string\" }".getBytes()); - assertEquals(new SchemaAndValue(null, "a string"), converted); - - converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": [1, \"2\", 3] }".getBytes()); - assertEquals(new SchemaAndValue(null, Arrays.asList(1L, "2", 3L)), converted); - - converted = converter.toCopycatData(TOPIC, "{ \"schema\": null, \"payload\": { \"field1\": 1, \"field2\": 2} }".getBytes()); - Map<String, Long> obj = new HashMap<>(); - obj.put("field1", 1L); - obj.put("field2", 2L); - assertEquals(new SchemaAndValue(null, obj), converted); - } - - @Test - public void decimalToCopycat() { - Schema schema = Decimal.schema(2); - BigDecimal reference = new BigDecimal(new BigInteger("156"), 2); - // Payload is base64 encoded byte[]{0, -100}, which is the two's complement encoding of 156. - String msg = "{ \"schema\": { \"type\": \"bytes\", \"name\": \"org.apache.kafka.copycat.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"2\" } }, \"payload\": \"AJw=\" }"; - SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes()); - BigDecimal converted = (BigDecimal) schemaAndValue.value(); - assertEquals(schema, schemaAndValue.schema()); - assertEquals(reference, converted); - } - - @Test - public void dateToCopycat() { - Schema schema = Date.SCHEMA; - GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - calendar.setTimeZone(TimeZone.getTimeZone("UTC")); - calendar.add(Calendar.DATE, 10000); - java.util.Date reference = calendar.getTime(); - String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.copycat.data.Date\", \"version\": 1 }, \"payload\": 10000 }"; - SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes()); - java.util.Date converted = (java.util.Date) schemaAndValue.value(); - assertEquals(schema, schemaAndValue.schema()); - assertEquals(reference, converted); - } - - @Test - public void timeToCopycat() { - Schema schema = Time.SCHEMA; - GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - calendar.setTimeZone(TimeZone.getTimeZone("UTC")); - calendar.add(Calendar.MILLISECOND, 14400000); - java.util.Date reference = calendar.getTime(); - String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.copycat.data.Time\", \"version\": 1 }, \"payload\": 14400000 }"; - SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes()); - java.util.Date converted = (java.util.Date) schemaAndValue.value(); - assertEquals(schema, schemaAndValue.schema()); - assertEquals(reference, converted); - } - - @Test - public void timestampToCopycat() { - Schema schema = Timestamp.SCHEMA; - GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - calendar.setTimeZone(TimeZone.getTimeZone("UTC")); - calendar.add(Calendar.MILLISECOND, 2000000000); - calendar.add(Calendar.MILLISECOND, 2000000000); - java.util.Date reference = calendar.getTime(); - String msg = "{ \"schema\": { \"type\": \"int64\", \"name\": \"org.apache.kafka.copycat.data.Timestamp\", \"version\": 1 }, \"payload\": 4000000000 }"; - SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes()); - java.util.Date converted = (java.util.Date) schemaAndValue.value(); - assertEquals(schema, schemaAndValue.schema()); - assertEquals(reference, converted); - } - - // Schema metadata - - @Test - public void testJsonSchemaMetadataTranslation() { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); - - converted = parse(converter.fromCopycatData(TOPIC, Schema.OPTIONAL_BOOLEAN_SCHEMA, null)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"boolean\", \"optional\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isNull()); - - converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().defaultValue(true).build(), true)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"default\": true }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); - - converted = parse(converter.fromCopycatData(TOPIC, SchemaBuilder.bool().required().name("bool").version(3).doc("the documentation").parameter("foo", "bar").build(), true)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 3, \"doc\": \"the documentation\", \"parameters\": { \"foo\": \"bar\" }}"), - converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); - } - - - @Test - public void testCacheSchemaToCopycatConversion() { - Cache<JsonNode, Schema> cache = Whitebox.getInternalState(converter, "toCopycatSchemaCache"); - assertEquals(0, cache.size()); - - converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()); - assertEquals(1, cache.size()); - - converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()); - assertEquals(1, cache.size()); - - // Different schema should also get cached - converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": true }".getBytes()); - assertEquals(2, cache.size()); - - // Even equivalent, but different JSON encoding of schema, should get different cache entry - converter.toCopycatData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false }, \"payload\": true }".getBytes()); - assertEquals(3, cache.size()); - } - - // Schema types - - @Test - public void booleanToJson() { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BOOLEAN_SCHEMA, true)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"boolean\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); - } - - @Test - public void byteToJson() { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT8_SCHEMA, (byte) 12)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"int8\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue()); - } - - @Test - public void shortToJson() { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT16_SCHEMA, (short) 12)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"int16\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue()); - } - - @Test - public void intToJson() { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT32_SCHEMA, 12)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"int32\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(12, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).intValue()); - } - - @Test - public void longToJson() { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.INT64_SCHEMA, 4398046511104L)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"int64\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(4398046511104L, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).longValue()); - } - - @Test - public void floatToJson() { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT32_SCHEMA, 12.34f)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"float\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(12.34f, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).floatValue(), 0.001); - } - - @Test - public void doubleToJson() { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, 12.34)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"double\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(12.34, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).doubleValue(), 0.001); - } - - @Test - public void bytesToJson() throws IOException { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.BYTES_SCHEMA, "test-string".getBytes())); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(ByteBuffer.wrap("test-string".getBytes()), - ByteBuffer.wrap(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue())); - } - - @Test - public void stringToJson() { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Schema.STRING_SCHEMA, "test-string")); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue()); - } - - @Test - public void arrayToJson() { - Schema int32Array = SchemaBuilder.array(Schema.INT32_SCHEMA).build(); - JsonNode converted = parse(converter.fromCopycatData(TOPIC, int32Array, Arrays.asList(1, 2, 3))); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"array\", \"items\": { \"type\": \"int32\", \"optional\": false }, \"optional\": false }"), - converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add(2).add(3), - converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); - } - - @Test - public void mapToJsonStringKeys() { - Schema stringIntMap = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(); - Map<String, Integer> input = new HashMap<>(); - input.put("key1", 12); - input.put("key2", 15); - JsonNode converted = parse(converter.fromCopycatData(TOPIC, stringIntMap, input)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"string\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"), - converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 12).put("key2", 15), - converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); - } - - @Test - public void mapToJsonNonStringKeys() { - Schema intIntMap = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(); - Map<Integer, Integer> input = new HashMap<>(); - input.put(1, 12); - input.put(2, 15); - JsonNode converted = parse(converter.fromCopycatData(TOPIC, intIntMap, input)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"map\", \"keys\": { \"type\" : \"int32\", \"optional\": false }, \"values\": { \"type\" : \"int32\", \"optional\": false }, \"optional\": false }"), - converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - - assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray()); - ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME); - assertEquals(2, payload.size()); - Set<JsonNode> payloadEntries = new HashSet<>(); - for (JsonNode elem : payload) - payloadEntries.add(elem); - assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add(1).add(12), - JsonNodeFactory.instance.arrayNode().add(2).add(15))), - payloadEntries - ); - } - - @Test - public void structToJson() { - Schema schema = SchemaBuilder.struct().field("field1", Schema.BOOLEAN_SCHEMA).field("field2", Schema.STRING_SCHEMA).build(); - Struct input = new Struct(schema).put("field1", true).put("field2", "string"); - JsonNode converted = parse(converter.fromCopycatData(TOPIC, schema, input)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"struct\", \"optional\": false, \"fields\": [{ \"field\": \"field1\", \"type\": \"boolean\", \"optional\": false }, { \"field\": \"field2\", \"type\": \"string\", \"optional\": false }] }"), - converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertEquals(JsonNodeFactory.instance.objectNode() - .put("field1", true) - .put("field2", "string"), - converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); - } - - - @Test - public void decimalToJson() throws IOException { - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2))); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"bytes\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Decimal\", \"version\": 1, \"parameters\": { \"scale\": \"2\" } }"), - converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertArrayEquals(new byte[]{0, -100}, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).binaryValue()); - } - - @Test - public void dateToJson() throws IOException { - GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - calendar.setTimeZone(TimeZone.getTimeZone("UTC")); - calendar.add(Calendar.DATE, 10000); - java.util.Date date = calendar.getTime(); - - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Date.SCHEMA, date)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Date\", \"version\": 1 }"), - converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME); - assertTrue(payload.isInt()); - assertEquals(10000, payload.intValue()); - } - - @Test - public void timeToJson() throws IOException { - GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - calendar.setTimeZone(TimeZone.getTimeZone("UTC")); - calendar.add(Calendar.MILLISECOND, 14400000); - java.util.Date date = calendar.getTime(); - - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Time.SCHEMA, date)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Time\", \"version\": 1 }"), - converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME); - assertTrue(payload.isInt()); - assertEquals(14400000, payload.longValue()); - } - - @Test - public void timestampToJson() throws IOException { - GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); - calendar.setTimeZone(TimeZone.getTimeZone("UTC")); - calendar.add(Calendar.MILLISECOND, 2000000000); - calendar.add(Calendar.MILLISECOND, 2000000000); - java.util.Date date = calendar.getTime(); - - JsonNode converted = parse(converter.fromCopycatData(TOPIC, Timestamp.SCHEMA, date)); - validateEnvelope(converted); - assertEquals(parse("{ \"type\": \"int64\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Timestamp\", \"version\": 1 }"), - converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME); - assertTrue(payload.isLong()); - assertEquals(4000000000L, payload.longValue()); - } - - - @Test - public void nullSchemaAndPrimitiveToJson() { - // This still needs to do conversion of data, null schema means "anything goes" - JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true)); - validateEnvelopeNullSchema(converted); - assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); - assertEquals(true, converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).booleanValue()); - } - - @Test - public void nullSchemaAndArrayToJson() { - // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match - // types to verify conversion still works. - JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, Arrays.asList(1, "string", true))); - validateEnvelopeNullSchema(converted); - assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); - assertEquals(JsonNodeFactory.instance.arrayNode().add(1).add("string").add(true), - converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); - } - - @Test - public void nullSchemaAndMapToJson() { - // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match - // types to verify conversion still works. - Map<String, Object> input = new HashMap<>(); - input.put("key1", 12); - input.put("key2", "string"); - input.put("key3", true); - JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input)); - validateEnvelopeNullSchema(converted); - assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); - assertEquals(JsonNodeFactory.instance.objectNode().put("key1", 12).put("key2", "string").put("key3", true), - converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); - } - - @Test - public void nullSchemaAndMapNonStringKeysToJson() { - // This still needs to do conversion of data, null schema means "anything goes". Make sure we mix and match - // types to verify conversion still works. - Map<Object, Object> input = new HashMap<>(); - input.put("string", 12); - input.put(52, "string"); - input.put(false, true); - JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, input)); - validateEnvelopeNullSchema(converted); - assertTrue(converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); - assertTrue(converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).isArray()); - ArrayNode payload = (ArrayNode) converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME); - assertEquals(3, payload.size()); - Set<JsonNode> payloadEntries = new HashSet<>(); - for (JsonNode elem : payload) - payloadEntries.add(elem); - assertEquals(new HashSet<>(Arrays.asList(JsonNodeFactory.instance.arrayNode().add("string").add(12), - JsonNodeFactory.instance.arrayNode().add(52).add("string"), - JsonNodeFactory.instance.arrayNode().add(false).add(true))), - payloadEntries - ); - } - - - @Test(expected = DataException.class) - public void mismatchSchemaJson() { - // If we have mismatching schema info, we should properly convert to a DataException - converter.fromCopycatData(TOPIC, Schema.FLOAT64_SCHEMA, true); - } - - - - @Test - public void noSchemaToCopycat() { - Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false); - converter.configure(props, true); - assertEquals(new SchemaAndValue(null, true), converter.toCopycatData(TOPIC, "true".getBytes())); - } - - @Test - public void noSchemaToJson() { - Map<String, Boolean> props = Collections.singletonMap("schemas.enable", false); - converter.configure(props, true); - JsonNode converted = parse(converter.fromCopycatData(TOPIC, null, true)); - assertTrue(converted.isBoolean()); - assertEquals(true, converted.booleanValue()); - } - - @Test - public void testCacheSchemaToJsonConversion() { - Cache<Schema, ObjectNode> cache = Whitebox.getInternalState(converter, "fromCopycatSchemaCache"); - assertEquals(0, cache.size()); - - // Repeated conversion of the same schema, even if the schema object is different should return the same Java - // object - converter.fromCopycatData(TOPIC, SchemaBuilder.bool().build(), true); - assertEquals(1, cache.size()); - - converter.fromCopycatData(TOPIC, SchemaBuilder.bool().build(), true); - assertEquals(1, cache.size()); - - // Validate that a similar, but different schema correctly returns a different schema. - converter.fromCopycatData(TOPIC, SchemaBuilder.bool().optional().build(), true); - assertEquals(2, cache.size()); - } - - - private JsonNode parse(byte[] json) { - try { - return objectMapper.readTree(json); - } catch (IOException e) { - fail("IOException during JSON parse: " + e.getMessage()); - throw new RuntimeException("failed"); - } - } - - private JsonNode parse(String json) { - try { - return objectMapper.readTree(json); - } catch (IOException e) { - fail("IOException during JSON parse: " + e.getMessage()); - throw new RuntimeException("failed"); - } - } - - private void validateEnvelope(JsonNode env) { - assertNotNull(env); - assertTrue(env.isObject()); - assertEquals(2, env.size()); - assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isObject()); - assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); - } - - private void validateEnvelopeNullSchema(JsonNode env) { - assertNotNull(env); - assertTrue(env.isObject()); - assertEquals(2, env.size()); - assertTrue(env.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); - assertTrue(env.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME).isNull()); - assertTrue(env.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java deleted file mode 100644 index 8dfefaa..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.cli; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.copycat.runtime.Copycat; -import org.apache.kafka.copycat.runtime.Worker; -import org.apache.kafka.copycat.runtime.distributed.DistributedConfig; -import org.apache.kafka.copycat.runtime.distributed.DistributedHerder; -import org.apache.kafka.copycat.runtime.rest.RestServer; -import org.apache.kafka.copycat.storage.KafkaOffsetBackingStore; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Map; - -/** - * <p> - * Command line utility that runs Copycat in distributed mode. In this mode, the process joints a group of other workers - * and work is distributed among them. This is useful for running Copycat as a service, where connectors can be - * submitted to the cluster to be automatically executed in a scalable, distributed fashion. This also allows you to - * easily scale out horizontally, elastically adding or removing capacity simply by starting or stopping worker - * instances. - * </p> - */ -@InterfaceStability.Unstable -public class CopycatDistributed { - private static final Logger log = LoggerFactory.getLogger(CopycatDistributed.class); - - public static void main(String[] args) throws Exception { - if (args.length < 1) { - log.info("Usage: CopycatDistributed worker.properties"); - System.exit(1); - } - - String workerPropsFile = args[0]; - Map<String, String> workerProps = !workerPropsFile.isEmpty() ? - Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap(); - - DistributedConfig config = new DistributedConfig(workerProps); - Worker worker = new Worker(config, new KafkaOffsetBackingStore()); - RestServer rest = new RestServer(config); - DistributedHerder herder = new DistributedHerder(config, worker, rest.advertisedUrl()); - final Copycat copycat = new Copycat(worker, herder, rest); - copycat.start(); - - // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request - copycat.awaitStop(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java deleted file mode 100644 index 3869552..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.cli; - -import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.copycat.runtime.ConnectorConfig; -import org.apache.kafka.copycat.runtime.Copycat; -import org.apache.kafka.copycat.runtime.Herder; -import org.apache.kafka.copycat.runtime.Worker; -import org.apache.kafka.copycat.runtime.rest.RestServer; -import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo; -import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig; -import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder; -import org.apache.kafka.copycat.storage.FileOffsetBackingStore; -import org.apache.kafka.copycat.util.Callback; -import org.apache.kafka.copycat.util.FutureCallback; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.Collections; -import java.util.Map; - -/** - * <p> - * Command line utility that runs Copycat as a standalone process. In this mode, work is not - * distributed. Instead, all the normal Copycat machinery works within a single process. This is - * useful for ad hoc, small, or experimental jobs. - * </p> - * <p> - * By default, no job configs or offset data is persistent. You can make jobs persistent and - * fault tolerant by overriding the settings to use file storage for both. - * </p> - */ -@InterfaceStability.Unstable -public class CopycatStandalone { - private static final Logger log = LoggerFactory.getLogger(CopycatStandalone.class); - - public static void main(String[] args) throws Exception { - - if (args.length < 2) { - log.info("Usage: CopycatStandalone worker.properties connector1.properties [connector2.properties ...]"); - System.exit(1); - } - - String workerPropsFile = args[0]; - Map<String, String> workerProps = !workerPropsFile.isEmpty() ? - Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap(); - - StandaloneConfig config = new StandaloneConfig(workerProps); - Worker worker = new Worker(config, new FileOffsetBackingStore()); - RestServer rest = new RestServer(config); - Herder herder = new StandaloneHerder(worker); - final Copycat copycat = new Copycat(worker, herder, rest); - copycat.start(); - - try { - for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) { - Map<String, String> connectorProps = Utils.propsToStringMap(Utils.loadProps(connectorPropsFile)); - FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() { - @Override - public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) { - if (error != null) - log.error("Failed to create job for {}", connectorPropsFile); - else - log.info("Created connector {}", info.result().name()); - } - }); - herder.putConnectorConfig( - connectorProps.get(ConnectorConfig.NAME_CONFIG), - connectorProps, false, cb); - cb.get(); - } - } catch (Throwable t) { - log.error("Stopping after connector error", t); - copycat.stop(); - } - - // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request - copycat.awaitStop(); - } -}