Author: blue
Date: Wed May 27 04:48:58 2015
New Revision: 1681906
URL: http://svn.apache.org/r1681906
Log:
AVRO-1497: Add LogicalType implementation.
This closes PR #29 and includes the following commits:
ca1d2b1 AVRO-1497: Add LogicalTypes and read-side implementation.
442a917 AVRO-1497: Add logical type support to schema reflection.
ec8d6d4 AVRO-1497: Add logical type writes to generic and reflect.
e6e9761 AVRO-1497: Clean up Conversion and LogicalType classes.
8fe954a AVRO-1497: Add Conversion and LogicalType javadoc.
3abf042 AVRO-1497: Fix ByteBuffer bug in DecimalConversion.
2293a18 AVRO-1497: Fix review items.
207afd3 AVRO-1497: Add logical type registration and record test.
d2377b2 AVRO-1497: Maven CLI and checkstyle fixes.
1e628b2 AVRO-1497: Fix test failures.
15ed857 AVRO-1497: Fix performance issues with logical types.
fb84364 AVRO-1497: Check logical type once per array.
7de6edc AVRO-1497: Remove unnecessary changes to Schema.
Added:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/reflect/TestReflectLogicalTypes.java
Modified:
avro/trunk/CHANGES.txt
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
Modified: avro/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/avro/trunk/CHANGES.txt?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
--- avro/trunk/CHANGES.txt (original)
+++ avro/trunk/CHANGES.txt Wed May 27 04:48:58 2015
@@ -42,6 +42,8 @@ Trunk (not yet released)
AVRO-680. Java: Support non-string map keys. (Sachin Goyal via Ryan Blue).
+ AVRO-1497. Java: Add support for logical types. (blue)
+
OPTIMIZATIONS
IMPROVEMENTS
Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java
(added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversion.java Wed
May 27 04:48:58 2015
@@ -0,0 +1,171 @@
+package org.apache.avro;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.IndexedRecord;
+
+/**
+ * Conversion between generic and logical type instances.
+ * <p>
+ * Instances of this class are added to GenericData to convert a logical type
+ * to a particular representation.
+ * <p>
+ * Implementations must provide:
+ * * {@link #getConvertedType()}: get the Java class used for the logical type
+ * * {@link #getLogicalTypeName()}: get the logical type this implements
+ * <p>
+ * Subclasses must also override all of the conversion methods for Avro's base
+ * types that are valid for the logical type, or else risk causing
+ * {@code UnsupportedOperationException} at runtime.
+ * <p>
+ * Optionally, use {@link #getRecommendedSchema()} to provide a Schema that
+ * will be used when a Schema is generated for the class returned by
+ * {@code getConvertedType}.
+ *
+ * @param <T> a Java type that generic data is converted to
+ */
+public abstract class Conversion<T> {
+
+ /**
+ * Return the Java class representing the logical type.
+ *
+ * @return a Java class returned by from methods and accepted by to methods
+ */
+ public abstract Class<T> getConvertedType();
+
+ /**
+ * Return the logical type this class converts.
+ *
+ * @return a String logical type name
+ */
+ public abstract String getLogicalTypeName();
+
+ public Schema getRecommendedSchema() {
+ throw new UnsupportedOperationException(
+ "No recommended schema for " + getLogicalTypeName());
+ }
+
+ public T fromBoolean(Boolean value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromBoolean is not supported for " + type.getName());
+ }
+
+ public T fromInt(Integer value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromInt is not supported for " + type.getName());
+ }
+
+ public T fromLong(Long value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromLong is not supported for " + type.getName());
+ }
+
+ public T fromFloat(Float value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromFloat is not supported for " + type.getName());
+ }
+
+ public T fromDouble(Double value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromDouble is not supported for " + type.getName());
+ }
+
+ public T fromCharSequence(CharSequence value, Schema schema, LogicalType
type) {
+ throw new UnsupportedOperationException(
+ "fromCharSequence is not supported for " + type.getName());
+ }
+
+ public T fromEnumSymbol(GenericEnumSymbol value, Schema schema, LogicalType
type) {
+ throw new UnsupportedOperationException(
+ "fromEnumSymbol is not supported for " + type.getName());
+ }
+
+ public T fromFixed(GenericFixed value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromFixed is not supported for " + type.getName());
+ }
+
+ public T fromBytes(ByteBuffer value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromBytes is not supported for " + type.getName());
+ }
+
+ public T fromArray(Collection<?> value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromArray is not supported for " + type.getName());
+ }
+
+ public T fromMap(Map<?, ?> value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromMap is not supported for " + type.getName());
+ }
+
+ public T fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "fromRecord is not supported for " + type.getName());
+ }
+
+ public Boolean toBoolean(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toBoolean is not supported for " + type.getName());
+ }
+
+ public Integer toInt(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toInt is not supported for " + type.getName());
+ }
+
+ public Long toLong(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toLong is not supported for " + type.getName());
+ }
+
+ public Float toFloat(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toFloat is not supported for " + type.getName());
+ }
+
+ public Double toDouble(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toDouble is not supported for " + type.getName());
+ }
+
+ public CharSequence toCharSequence(T value, Schema schema, LogicalType type)
{
+ throw new UnsupportedOperationException(
+ "toCharSequence is not supported for " + type.getName());
+ }
+
+ public GenericEnumSymbol toEnumSymbol(T value, Schema schema, LogicalType
type) {
+ throw new UnsupportedOperationException(
+ "toEnumSymbol is not supported for " + type.getName());
+ }
+
+ public GenericFixed toFixed(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toFixed is not supported for " + type.getName());
+ }
+
+ public ByteBuffer toBytes(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toBytes is not supported for " + type.getName());
+ }
+
+ public Collection<?> toArray(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toArray is not supported for " + type.getName());
+ }
+
+ public Map<?, ?> toMap(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toMap is not supported for " + type.getName());
+ }
+
+ public IndexedRecord toRecord(T value, Schema schema, LogicalType type) {
+ throw new UnsupportedOperationException(
+ "toRecord is not supported for " + type.getName());
+ }
+
+}
Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java
(added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Conversions.java
Wed May 27 04:48:58 2015
@@ -0,0 +1,105 @@
+package org.apache.avro;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
+
+public class Conversions {
+
+ public static class UUIDConversion extends Conversion<UUID> {
+ @Override
+ public Class<UUID> getConvertedType() {
+ return UUID.class;
+ }
+
+ @Override
+ public Schema getRecommendedSchema() {
+ return
LogicalTypes.uuid().addToSchema(Schema.create(Schema.Type.STRING));
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "uuid";
+ }
+
+ @Override
+ public UUID fromCharSequence(CharSequence value, Schema schema,
LogicalType type) {
+ return UUID.fromString(value.toString());
+ }
+
+ @Override
+ public CharSequence toCharSequence(UUID value, Schema schema, LogicalType
type) {
+ return value.toString();
+ }
+ }
+
+ public static class DecimalConversion extends Conversion<BigDecimal> {
+ @Override
+ public Class<BigDecimal> getConvertedType() {
+ return BigDecimal.class;
+ }
+
+ @Override
+ public Schema getRecommendedSchema() {
+ throw new UnsupportedOperationException(
+ "No recommended schema for decimal (scale is required)");
+ }
+
+ @Override
+ public String getLogicalTypeName() {
+ return "decimal";
+ }
+
+ @Override
+ public BigDecimal fromBytes(ByteBuffer value, Schema schema, LogicalType
type) {
+ int scale = ((LogicalTypes.Decimal) type).getScale();
+ // always copy the bytes out because BigInteger has no offset/length ctor
+ byte[] bytes = value.get(new byte[value.remaining()]).array();
+ return new BigDecimal(new BigInteger(bytes), scale);
+ }
+
+ @Override
+ public ByteBuffer toBytes(BigDecimal value, Schema schema, LogicalType
type) {
+ int scale = ((LogicalTypes.Decimal) type).getScale();
+ if (scale != value.scale()) {
+ throw new AvroTypeException("Cannot encode decimal with scale " +
+ value.scale() + " as scale " + scale);
+ }
+ return ByteBuffer.wrap(value.unscaledValue().toByteArray());
+ }
+
+ @Override
+ public BigDecimal fromFixed(GenericFixed value, Schema schema, LogicalType
type) {
+ int scale = ((LogicalTypes.Decimal) type).getScale();
+ return new BigDecimal(new BigInteger(value.bytes()), scale);
+ }
+
+ @Override
+ public GenericFixed toFixed(BigDecimal value, Schema schema, LogicalType
type) {
+ int scale = ((LogicalTypes.Decimal) type).getScale();
+ if (scale != value.scale()) {
+ throw new AvroTypeException("Cannot encode decimal with scale " +
+ value.scale() + " as scale " + scale);
+ }
+
+ byte fillByte = (byte) (value.signum() < 0 ? 0xFF : 0x00);
+ byte[] unscaled = value.unscaledValue().toByteArray();
+ byte[] bytes = new byte[schema.getFixedSize()];
+ int offset = bytes.length - unscaled.length;
+
+ for (int i = 0; i < bytes.length; i += 1) {
+ if (i < offset) {
+ bytes[i] = fillByte;
+ } else {
+ bytes[i] = unscaled[i - offset];
+ }
+ }
+
+ return new GenericData.Fixed(schema, bytes);
+ }
+ }
+
+}
Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java
(added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalType.java
Wed May 27 04:48:58 2015
@@ -0,0 +1,77 @@
+package org.apache.avro;
+
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.specific.SpecificData;
+
+/**
+ * Logical types provides an opt-in way to extend Avro's types. Logical types
+ * specify a way of representing a high-level type as a base Avro type. For
+ * example, a date is specified as the number of days after the unix epoch (or
+ * before using a negative value). This enables extentions to Avro's type
+ * system without breaking binary compatibility. Older versions see the base
+ * type and ignore the logical type.
+ */
+public class LogicalType {
+
+ public static final String LOGICAL_TYPE_PROP = "logicalType";
+
+ private static final String[] INCOMPATIBLE_PROPS = new String[] {
+ GenericData.STRING_PROP, SpecificData.CLASS_PROP,
+ SpecificData.KEY_CLASS_PROP, SpecificData.ELEMENT_PROP
+ };
+
+ private final String name;
+
+ public LogicalType(String logicalTypeName) {
+ this.name = logicalTypeName.intern();
+ }
+
+ /**
+ * Get the name of this logical type.
+ * <p>
+ * This name is set as the Schema property "logicalType".
+ *
+ * @return the String name of the logical type
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Add this logical type to the given Schema.
+ * <p>
+ * The "logicalType" property will be set to this type's name, and other
+ * type-specific properties may be added. The Schema is first validated to
+ * ensure it is compatible.
+ *
+ * @param schema a Schema
+ * @return the modified Schema
+ * @throws IllegalArgumentException if the type and schema are incompatible
+ */
+ public Schema addToSchema(Schema schema) {
+ validate(schema);
+ schema.addProp(LOGICAL_TYPE_PROP, name);
+ schema.setLogicalType(this);
+ return schema;
+ }
+
+ /**
+ * Validate this logical type for the given Schema.
+ * <p>
+ * This will throw an exception if the Schema is incompatible with this type.
+ * For example, a date is stored as an int and is incompatible with a fixed
+ * Schema.
+ *
+ * @param schema a Schema
+ * @throws IllegalArgumentException if the type and schema are incompatible
+ */
+ public void validate(Schema schema) {
+ for (String incompatible : INCOMPATIBLE_PROPS) {
+ if (schema.getProp(incompatible) != null) {
+ throw new IllegalArgumentException(
+ LOGICAL_TYPE_PROP + " cannot be used with " + incompatible);
+ }
+ }
+ }
+
+}
Added: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java?rev=1681906&view=auto
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
(added)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/LogicalTypes.java
Wed May 27 04:48:58 2015
@@ -0,0 +1,218 @@
+package org.apache.avro;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.avro.util.WeakIdentityHashMap;
+
+public class LogicalTypes {
+
+ private static final Map<Schema, LogicalType> CACHE =
+ new WeakIdentityHashMap<Schema, LogicalType>();
+
+ public interface LogicalTypeFactory {
+ LogicalType fromSchema(Schema schema);
+ }
+
+ private static final Map<String, LogicalTypeFactory> REGISTERED_TYPES =
+ new ConcurrentHashMap<String, LogicalTypeFactory>();
+
+ public static void register(String logicalTypeName, LogicalTypeFactory
factory) {
+ if (logicalTypeName == null) {
+ throw new NullPointerException("Invalid logical type name: null");
+ }
+ if (factory == null) {
+ throw new NullPointerException("Invalid logical type factory: null");
+ }
+ REGISTERED_TYPES.put(logicalTypeName, factory);
+ }
+
+ /**
+ * Returns the {@link LogicalType} from the schema, if one is present.
+ * @param schema
+ * @return
+ */
+ public static LogicalType fromSchema(Schema schema) {
+ return fromSchemaImpl(schema, true);
+ }
+
+ public static LogicalType fromSchemaIgnoreInvalid(Schema schema) {
+ if (CACHE.containsKey(schema)) {
+ return CACHE.get(schema);
+ }
+
+ LogicalType logicalType = fromSchemaImpl(schema, false);
+
+ // add to the cache, even if it is null
+ CACHE.put(schema, logicalType);
+
+ return logicalType;
+ }
+
+ private static LogicalType fromSchemaImpl(Schema schema, boolean
throwErrors) {
+ String typeName = schema.getProp(LogicalType.LOGICAL_TYPE_PROP);
+
+ LogicalType logicalType;
+ try {
+ if ("decimal".equals(typeName)) {
+ logicalType = new Decimal(schema);
+ } else if ("uuid".equals(typeName)) {
+ logicalType = UUID_TYPE;
+ } else if (REGISTERED_TYPES.containsKey(typeName)) {
+ logicalType = REGISTERED_TYPES.get(typeName).fromSchema(schema);
+ } else {
+ logicalType = null;
+ }
+
+ // make sure the type is valid before returning it
+ if (logicalType != null) {
+ logicalType.validate(schema);
+ }
+ } catch (RuntimeException e) {
+ if (throwErrors) {
+ throw e;
+ }
+ // ignore invalid types
+ logicalType = null;
+ }
+
+ return logicalType;
+ }
+
+ /** Create a Decimal LogicalType with the given precision and scale 0 */
+ public static Decimal decimal(int precision) {
+ return decimal(precision, 0);
+ }
+
+ /** Create a Decimal LogicalType with the given precision and scale */
+ public static Decimal decimal(int precision, int scale) {
+ return new Decimal(precision, scale);
+ }
+
+ private static final LogicalType UUID_TYPE = new LogicalType("uuid");
+
+ public static LogicalType uuid() {
+ return UUID_TYPE;
+ }
+
+ /** Decimal represents arbitrary-precision fixed-scale decimal numbers */
+ public static class Decimal extends LogicalType {
+ private static final String PRECISION_PROP = "precision";
+ private static final String SCALE_PROP = "scale";
+
+ private final int precision;
+ private final int scale;
+
+ private Decimal(int precision, int scale) {
+ super("decimal");
+ this.precision = precision;
+ this.scale = scale;
+ }
+
+ private Decimal(Schema schema) {
+ super("decimal");
+ if (!hasProperty(schema, PRECISION_PROP)) {
+ throw new IllegalArgumentException(
+ "Invalid decimal: missing precision");
+ }
+
+ this.precision = getInt(schema, PRECISION_PROP);
+
+ if (hasProperty(schema, SCALE_PROP)) {
+ this.scale = getInt(schema, SCALE_PROP);
+ } else {
+ this.scale = 0;
+ }
+ }
+
+ @Override
+ public Schema addToSchema(Schema schema) {
+ super.addToSchema(schema);
+ schema.addProp(PRECISION_PROP, precision);
+ schema.addProp(SCALE_PROP, scale);
+ return schema;
+ }
+
+ public int getPrecision() {
+ return precision;
+ }
+
+ public int getScale() {
+ return scale;
+ }
+
+ @Override
+ public void validate(Schema schema) {
+ super.validate(schema);
+ // validate the type
+ if (schema.getType() != Schema.Type.FIXED &&
+ schema.getType() != Schema.Type.BYTES) {
+ throw new IllegalArgumentException(
+ "Logical type decimal must be backed by fixed or bytes");
+ }
+ if (precision <= 0) {
+ throw new IllegalArgumentException("Invalid decimal precision: " +
+ precision + " (must be positive)");
+ } else if (precision > maxPrecision(schema)) {
+ throw new IllegalArgumentException(
+ "fixed(" + schema.getFixedSize() + ") cannot store " +
+ precision + " digits (max " + maxPrecision(schema) + ")");
+ }
+ if (scale < 0) {
+ throw new IllegalArgumentException("Invalid decimal scale: " +
+ scale + " (must be positive)");
+ } else if (scale > precision) {
+ throw new IllegalArgumentException("Invalid decimal scale: " +
+ scale + " (greater than precision: " + precision + ")");
+ }
+ }
+
+ private long maxPrecision(Schema schema) {
+ if (schema.getType() == Schema.Type.BYTES) {
+ // not bounded
+ return Integer.MAX_VALUE;
+ } else if (schema.getType() == Schema.Type.FIXED) {
+ int size = schema.getFixedSize();
+ return Math.round( // convert double to long
+ Math.floor(Math.log10( // number of base-10 digits
+ Math.pow(2, 8 * size - 1) - 1) // max value stored
+ ));
+ } else {
+ // not valid for any other type
+ return 0;
+ }
+ }
+
+ private boolean hasProperty(Schema schema, String name) {
+ return (schema.getObjectProp(name) != null);
+ }
+
+ private int getInt(Schema schema, String name) {
+ Object obj = schema.getObjectProp(name);
+ if (obj instanceof Integer) {
+ return (Integer) obj;
+ }
+ throw new IllegalArgumentException("Expected int " + name + ": " +
+ (obj == null ? "null" : obj + ":" + obj.getClass().getSimpleName()));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Decimal decimal = (Decimal) o;
+
+ if (precision != decimal.precision) return false;
+ if (scale != decimal.scale) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = precision;
+ result = 31 * result + scale;
+ return result;
+ }
+ }
+}
Modified: avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
--- avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java
(original)
+++ avro/trunk/lang/java/avro/src/main/java/org/apache/avro/Schema.java Wed May
27 04:48:58 2015
@@ -98,6 +98,7 @@ public abstract class Schema extends Jso
};
private final Type type;
+ private LogicalType logicalType = null;
Schema(Type type) {
super(SCHEMA_RESERVED);
@@ -138,6 +139,14 @@ public abstract class Schema extends Jso
hashCode = NO_HASHCODE;
}
+ public LogicalType getLogicalType() {
+ return logicalType;
+ }
+
+ void setLogicalType(LogicalType logicalType) {
+ this.logicalType = logicalType;
+ }
+
/** Create an anonymous record schema. */
public static Schema createRecord(List<Field> fields) {
Schema result = createRecord(null, null, null, false);
@@ -1310,6 +1319,8 @@ public abstract class Schema extends Jso
if (!SCHEMA_RESERVED.contains(prop)) // ignore reserved
result.addProp(prop, schema.get(prop));
}
+ // parse logical type if present
+ result.logicalType = LogicalTypes.fromSchemaIgnoreInvalid(result);
names.space(savedSpace); // restore space
if (result instanceof NamedSchema) {
Set<String> aliases = parseAliases(schema);
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java
Wed May 27 04:48:58 2015
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.IdentityHashMap;
import java.util.WeakHashMap;
import java.util.Iterator;
import java.util.List;
@@ -32,6 +33,8 @@ import java.util.Map;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
@@ -59,7 +62,7 @@ public class GenericData {
/** Used to specify the Java type for a string schema. */
public enum StringType { CharSequence, String, Utf8 };
- protected static final String STRING_PROP = "avro.java.string";
+ public static final String STRING_PROP = "avro.java.string";
protected static final String STRING_TYPE_STRING = "String";
private final ClassLoader classLoader;
@@ -91,6 +94,47 @@ public class GenericData {
/** Return the class loader that's used (by subclasses). */
public ClassLoader getClassLoader() { return classLoader; }
+ public Map<String, Conversion<?>> conversions =
+ new HashMap<String, Conversion<?>>();
+
+ public Map<Class<?>, Conversion<?>> conversionsByClass =
+ new IdentityHashMap<Class<?>, Conversion<?>>();
+
+ public void addLogicalTypeConversion(Conversion<?> conversion) {
+ conversions.put(conversion.getLogicalTypeName(), conversion);
+ conversionsByClass.put(conversion.getConvertedType(), conversion);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> Conversion<? super T> getConversionFrom(Class<T> datumClass,
+ LogicalType logicalType) {
+ Conversion<?> conversion = conversionsByClass.get(datumClass);
+ if (conversion != null &&
+ conversion.getLogicalTypeName().equals(logicalType.getName())) {
+ return (Conversion<T>) conversion;
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> Conversion<? extends T> getConversionTo(Class<T> datumClass,
+ LogicalType logicalType) {
+ Conversion<?> conversion = conversionsByClass.get(datumClass);
+ if (conversion != null &&
+ conversion.getLogicalTypeName().equals(logicalType.getName())) {
+ return (Conversion<T>) conversion;
+ }
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Conversion<Object> getConversionFor(LogicalType logicalType) {
+ if (logicalType == null) {
+ return null;
+ }
+ return (Conversion<Object>) conversions.get(logicalType.getName());
+ }
+
/** Default implementation of {@link GenericRecord}. Note that this
implementation
* does not fill in default values for fields if they are not specified; use
{@link
* GenericRecordBuilder} in that case.
@@ -609,6 +653,24 @@ public class GenericData {
/** Return the index for a datum within a union. Implemented with {@link
* Schema#getIndexNamed(String)} and {@link #getSchemaName(Object)}.*/
public int resolveUnion(Schema union, Object datum) {
+ // if there is a logical type that works, use it first
+ // this allows logical type concrete classes to overlap with supported ones
+ // for example, a conversion could return a map
+ if (datum != null) {
+ Conversion<?> conversion = conversionsByClass.get(datum.getClass());
+ if (conversion != null) {
+ String logicalTypeName = conversion.getLogicalTypeName();
+ List<Schema> candidates = union.getTypes();
+ for (int i = 0; i < candidates.size(); i += 1) {
+ LogicalType candidateLogicalType =
candidates.get(i).getLogicalType();
+ if (candidateLogicalType != null &&
+ logicalTypeName.equals(candidateLogicalType.getName())) {
+ return i;
+ }
+ }
+ }
+ }
+
Integer i = union.getIndexNamed(getSchemaName(datum));
if (i != null)
return i;
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumReader.java
Wed May 27 04:48:58 2015
@@ -27,6 +27,8 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.io.DatumReader;
@@ -147,6 +149,27 @@ public class GenericDatumReader<D> imple
/** Called to read data.*/
protected Object read(Object old, Schema expected,
ResolvingDecoder in) throws IOException {
+ Object datum = readWithoutConversion(old, expected, in);
+ LogicalType logicalType = expected.getLogicalType();
+ if (logicalType != null) {
+ Conversion<?> conversion = getData().getConversionFor(logicalType);
+ if (conversion != null) {
+ return convert(datum, expected, logicalType, conversion);
+ }
+ }
+ return datum;
+ }
+
+ protected Object readWithConversion(Object old, Schema expected,
+ LogicalType logicalType,
+ Conversion<?> conversion,
+ ResolvingDecoder in) throws IOException {
+ return convert(readWithoutConversion(old, expected, in),
+ expected, logicalType, conversion);
+ }
+
+ protected Object readWithoutConversion(Object old, Schema expected,
+ ResolvingDecoder in) throws IOException {
switch (expected.getType()) {
case RECORD: return readRecord(old, expected, in);
case ENUM: return readEnum(expected, in);
@@ -165,7 +188,31 @@ public class GenericDatumReader<D> imple
default: throw new AvroRuntimeException("Unknown type: " + expected);
}
}
-
+
+ protected Object convert(Object datum, Schema schema, LogicalType type,
+ Conversion<?> conversion) {
+ try {
+ switch (schema.getType()) {
+ case RECORD: return conversion.fromRecord((IndexedRecord) datum,
schema, type);
+ case ENUM: return conversion.fromEnumSymbol((GenericEnumSymbol)
datum, schema, type);
+ case ARRAY: return
conversion.fromArray(getData().getArrayAsCollection(datum), schema, type);
+ case MAP: return conversion.fromMap((Map<?, ?>) datum, schema, type);
+ case FIXED: return conversion.fromFixed((GenericFixed) datum, schema,
type);
+ case STRING: return conversion.fromCharSequence((CharSequence) datum,
schema, type);
+ case BYTES: return conversion.fromBytes((ByteBuffer) datum, schema,
type);
+ case INT: return conversion.fromInt((Integer) datum, schema, type);
+ case LONG: return conversion.fromLong((Long) datum, schema, type);
+ case FLOAT: return conversion.fromFloat((Float) datum, schema, type);
+ case DOUBLE: return conversion.fromDouble((Double) datum, schema, type);
+ case BOOLEAN: return conversion.fromBoolean((Boolean) datum, schema,
type);
+ }
+ return datum;
+ } catch (ClassCastException e) {
+ throw new AvroRuntimeException("Cannot convert " + datum + ":" +
+ datum.getClass().getSimpleName() + ": expected generic type", e);
+ }
+ }
+
/** Called to read a record instance. May be overridden for alternate record
* representations.*/
protected Object readRecord(Object old, Schema expected,
@@ -213,10 +260,20 @@ public class GenericDatumReader<D> imple
long l = in.readArrayStart();
long base = 0;
if (l > 0) {
+ LogicalType logicalType = expectedType.getLogicalType();
+ Conversion<?> conversion = getData().getConversionFor(logicalType);
Object array = newArray(old, (int) l, expected);
do {
- for (long i = 0; i < l; i++) {
- addToArray(array, base + i, read(peekArray(array), expectedType,
in));
+ if (logicalType != null && conversion != null) {
+ for (long i = 0; i < l; i++) {
+ addToArray(array, base + i, readWithConversion(
+ peekArray(array), expectedType, logicalType, conversion, in));
+ }
+ } else {
+ for (long i = 0; i < l; i++) {
+ addToArray(array, base + i, readWithoutConversion(
+ peekArray(array), expectedType, in));
+ }
}
base += l;
} while ((l = in.arrayNext()) > 0);
@@ -249,11 +306,21 @@ public class GenericDatumReader<D> imple
ResolvingDecoder in) throws IOException {
Schema eValue = expected.getValueType();
long l = in.readMapStart();
+ LogicalType logicalType = eValue.getLogicalType();
+ Conversion<?> conversion = getData().getConversionFor(logicalType);
Object map = newMap(old, (int) l);
if (l > 0) {
do {
- for (int i = 0; i < l; i++) {
- addToMap(map, readMapKey(null, expected, in), read(null, eValue,
in));
+ if (logicalType != null && conversion != null) {
+ for (int i = 0; i < l; i++) {
+ addToMap(map, readMapKey(null, expected, in),
+ readWithConversion(null, eValue, logicalType, conversion, in));
+ }
+ } else {
+ for (int i = 0; i < l; i++) {
+ addToMap(map, readMapKey(null, expected, in),
+ readWithoutConversion(null, eValue, in));
+ }
}
} while ((l = in.mapNext()) > 0);
}
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/generic/GenericDatumWriter.java
Wed May 27 04:48:58 2015
@@ -25,6 +25,8 @@ import java.util.Map;
import java.util.Collection;
import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.io.DatumWriter;
@@ -57,9 +59,46 @@ public class GenericDatumWriter<D> imple
public void write(D datum, Encoder out) throws IOException {
write(root, datum, out);
}
-
+
/** Called to write data.*/
protected void write(Schema schema, Object datum, Encoder out)
+ throws IOException {
+ LogicalType logicalType = schema.getLogicalType();
+ if (datum != null && logicalType != null) {
+ Conversion<?> conversion = getData()
+ .getConversionFrom(datum.getClass(), logicalType);
+ writeWithoutConversion(schema,
+ convert(schema, logicalType, conversion, datum), out);
+ } else {
+ writeWithoutConversion(schema, datum, out);
+ }
+ }
+
+ private <T> Object convert(Schema schema, LogicalType logicalType,
+ Conversion<T> conversion, Object datum) {
+ if (conversion == null) {
+ return datum;
+ }
+ Class<T> fromClass = conversion.getConvertedType();
+ switch (schema.getType()) {
+ case RECORD: return conversion.toRecord(fromClass.cast(datum), schema,
logicalType);
+ case ENUM: return conversion.toEnumSymbol(fromClass.cast(datum),
schema, logicalType);
+ case ARRAY: return conversion.toArray(fromClass.cast(datum), schema,
logicalType);
+ case MAP: return conversion.toMap(fromClass.cast(datum), schema,
logicalType);
+ case FIXED: return conversion.toFixed(fromClass.cast(datum), schema,
logicalType);
+ case STRING: return conversion.toCharSequence(fromClass.cast(datum),
schema, logicalType);
+ case BYTES: return conversion.toBytes(fromClass.cast(datum), schema,
logicalType);
+ case INT: return conversion.toInt(fromClass.cast(datum), schema,
logicalType);
+ case LONG: return conversion.toLong(fromClass.cast(datum), schema,
logicalType);
+ case FLOAT: return conversion.toFloat(fromClass.cast(datum), schema,
logicalType);
+ case DOUBLE: return conversion.toDouble(fromClass.cast(datum), schema,
logicalType);
+ case BOOLEAN: return conversion.toBoolean(fromClass.cast(datum), schema,
logicalType);
+ }
+ return datum;
+ }
+
+ /** Called to write data.*/
+ protected void writeWithoutConversion(Schema schema, Object datum, Encoder
out)
throws IOException {
try {
switch (schema.getType()) {
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectData.java
Wed May 27 04:48:58 2015
@@ -41,10 +41,13 @@ import java.util.concurrent.ConcurrentHa
import org.apache.avro.AvroRemoteException;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.AvroTypeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Protocol;
import org.apache.avro.Protocol.Message;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericContainer;
+import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.BinaryData;
@@ -379,6 +382,12 @@ public class ReflectData extends Specifi
@Override
public Class getClass(Schema schema) {
+ // see if the element class will be converted and use that class
+ Conversion<?> conversion = getConversionFor(schema.getLogicalType());
+ if (conversion != null) {
+ return conversion.getConvertedType();
+ }
+
switch (schema.getType()) {
case ARRAY:
Class collectionClass = getClassProp(schema, CLASS_PROP);
@@ -552,6 +561,11 @@ public class ReflectData extends Specifi
return Schema.create(Schema.Type.BYTES);
if (Collection.class.isAssignableFrom(c)) // array
throw new AvroRuntimeException("Can't find element type of
Collection");
+ for (Conversion<?> conversion : conversions.values()) { // logical type
+ if (conversion.getConvertedType().isAssignableFrom(c)) {
+ return conversion.getRecommendedSchema();
+ }
+ }
String fullName = c.getName();
Schema schema = names.get(fullName);
if (schema == null) {
@@ -859,5 +873,32 @@ public class ReflectData extends Specifi
schema.addAlias(alias.alias(), space);
}
}
-
+
+ @Override
+ public Object createFixed(Object old, Schema schema) {
+ // SpecificData will try to instantiate the type returned by getClass, but
+ // that is the converted class and can't be constructed.
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType != null) {
+ Conversion<?> conversion = getConversionFor(schema.getLogicalType());
+ if (conversion != null) {
+ return new GenericData.Fixed(schema);
+ }
+ }
+ return super.createFixed(old, schema);
+ }
+
+ @Override
+ public Object newRecord(Object old, Schema schema) {
+ // SpecificData will try to instantiate the type returned by getClass, but
+ // that is the converted class and can't be constructed.
+ LogicalType logicalType = schema.getLogicalType();
+ if (logicalType != null) {
+ Conversion<?> conversion = getConversionFor(schema.getLogicalType());
+ if (conversion != null) {
+ return new GenericData.Record(schema);
+ }
+ }
+ return super.newRecord(old, schema);
+ }
}
Modified:
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java?rev=1681906&r1=1681905&r2=1681906&view=diff
==============================================================================
---
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
(original)
+++
avro/trunk/lang/java/avro/src/main/java/org/apache/avro/reflect/ReflectDatumReader.java
Wed May 27 04:48:58 2015
@@ -25,6 +25,8 @@ import java.util.Collection;
import java.util.Map;
import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.Conversion;
+import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.generic.IndexedRecord;
@@ -75,6 +77,16 @@ public class ReflectDatumReader<T> exten
Class<?> elementClass =
ReflectData.getClassProp(schema, SpecificData.ELEMENT_PROP);
+ if (elementClass == null) {
+ // see if the element class will be converted and use that class
+ // logical types cannot conflict with java-element-class
+ Conversion<?> elementConversion = getData()
+ .getConversionFor(schema.getElementType().getLogicalType());
+ if (elementConversion != null) {
+ elementClass = elementConversion.getConvertedType();
+ }
+ }
+
if (collectionClass == null && elementClass == null)
return super.newArray(old, size, schema); // use specific/generic
@@ -163,26 +175,52 @@ public class ReflectDatumReader<T> exten
private Object readObjectArray(Object[] array, Schema expectedType, long l,
ResolvingDecoder in) throws IOException {
+ LogicalType logicalType = expectedType.getLogicalType();
+ Conversion<?> conversion = getData().getConversionFor(logicalType);
int index = 0;
- do {
- int limit = index + (int) l;
- while (index < limit) {
- Object element = read(null, expectedType, in);
- array[index] = element;
- index++;
- }
- } while ((l = in.arrayNext()) > 0);
+ if (logicalType != null && conversion != null) {
+ do {
+ int limit = index + (int) l;
+ while (index < limit) {
+ Object element = readWithConversion(
+ null, expectedType, logicalType, conversion, in);
+ array[index] = element;
+ index++;
+ }
+ } while ((l = in.arrayNext()) > 0);
+ } else {
+ do {
+ int limit = index + (int) l;
+ while (index < limit) {
+ Object element = readWithoutConversion(null, expectedType, in);
+ array[index] = element;
+ index++;
+ }
+ } while ((l = in.arrayNext()) > 0);
+ }
return array;
}
private Object readCollection(Collection<Object> c, Schema expectedType,
long l, ResolvingDecoder in) throws IOException {
- do {
- for (int i = 0; i < l; i++) {
- Object element = read(null, expectedType, in);
- c.add(element);
- }
- } while ((l = in.arrayNext()) > 0);
+ LogicalType logicalType = expectedType.getLogicalType();
+ Conversion<?> conversion = getData().getConversionFor(logicalType);
+ if (logicalType != null && conversion != null) {
+ do {
+ for (int i = 0; i < l; i++) {
+ Object element = readWithConversion(
+ null, expectedType, logicalType, conversion, in);
+ c.add(element);
+ }
+ } while ((l = in.arrayNext()) > 0);
+ } else {
+ do {
+ for (int i = 0; i < l; i++) {
+ Object element = readWithoutConversion(null, expectedType, in);
+ c.add(element);
+ }
+ } while ((l = in.arrayNext()) > 0);
+ }
return c;
}
@@ -245,6 +283,28 @@ public class ReflectDatumReader<T> exten
throw new AvroRuntimeException("Failed to read Stringable", e);
}
}
+ LogicalType logicalType = f.schema().getLogicalType();
+ if (logicalType != null) {
+ Conversion<?> conversion = getData().getConversionTo(
+ accessor.getField().getType(), logicalType);
+ if (conversion != null) {
+ try {
+ accessor.set(record, convert(
+ readWithoutConversion(oldDatum, f.schema(), in),
+ f.schema(), logicalType, conversion));
+ } catch (IllegalAccessException e) {
+ throw new AvroRuntimeException("Failed to set " + f);
+ }
+ return;
+ }
+ }
+ try {
+ accessor.set(record,
+ readWithoutConversion(oldDatum, f.schema(), in));
+ return;
+ } catch (IllegalAccessException e) {
+ throw new AvroRuntimeException("Failed to set " + f);
+ }
}
}
super.readField(record, f, oldDatum, in, state);
Added:
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java?rev=1681906&view=auto
==============================================================================
---
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java
(added)
+++
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/TestLogicalType.java
Wed May 27 04:48:58 2015
@@ -0,0 +1,279 @@
+package org.apache.avro;
+
+import java.util.Arrays;
+import java.util.concurrent.Callable;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestLogicalType {
+
+ @Test
+ public void testDecimalFromSchema() {
+ Schema schema = Schema.createFixed("aFixed", null, null, 4);
+ schema.addProp("logicalType", "decimal");
+ schema.addProp("precision", 9);
+ schema.addProp("scale", 2);
+ LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
+
+ Assert.assertTrue("Should be a Decimal",
+ logicalType instanceof LogicalTypes.Decimal);
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
+ Assert.assertEquals("Should have correct precision",
+ 9, decimal.getPrecision());
+ Assert.assertEquals("Should have correct scale",
+ 2, decimal.getScale());
+ }
+
+ @Test
+ public void testInvalidLogicalTypeIgnored() {
+ final Schema schema = Schema.createFixed("aFixed", null, null, 2);
+ schema.addProp("logicalType", "decimal");
+ schema.addProp("precision", 9);
+ schema.addProp("scale", 2);
+
+ Assert.assertNull("Should ignore invalid logical type",
+ LogicalTypes.fromSchemaIgnoreInvalid(schema));
+ }
+
+ @Test
+ public void testDecimalWithNonByteArrayTypes() {
+ final LogicalType decimal = LogicalTypes.decimal(5, 2);
+ // test simple types
+ Schema[] nonBytes = new Schema[] {
+ Schema.createRecord("Record", null, null, false),
+ Schema.createArray(Schema.create(Schema.Type.BYTES)),
+ Schema.createMap(Schema.create(Schema.Type.BYTES)),
+ Schema.createEnum("Enum", null, null, Arrays.asList("a", "b")),
+ Schema.createUnion(Arrays.asList(
+ Schema.create(Schema.Type.BYTES),
+ Schema.createFixed("fixed", null, null, 4))),
+ Schema.create(Schema.Type.BOOLEAN), Schema.create(Schema.Type.INT),
+ Schema.create(Schema.Type.LONG), Schema.create(Schema.Type.FLOAT),
+ Schema.create(Schema.Type.DOUBLE), Schema.create(Schema.Type.NULL),
+ Schema.create(Schema.Type.STRING) };
+ for (final Schema schema : nonBytes) {
+ assertThrows("Should reject type: " + schema.getType(),
+ IllegalArgumentException.class,
+ "Logical type decimal must be backed by fixed or bytes", new
Callable() {
+ @Override
+ public Object call() throws Exception {
+ decimal.addToSchema(schema);
+ return null;
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testUnknownFromJsonNode() {
+ Schema schema = Schema.create(Schema.Type.STRING);
+ schema.addProp("logicalType", "unknown");
+ schema.addProp("someProperty", 34);
+ LogicalType logicalType = LogicalTypes.fromSchemaIgnoreInvalid(schema);
+ Assert.assertNull("Should not return a LogicalType instance", logicalType);
+ }
+
+ @Test
+ public void testDecimalBytesHasNoPrecisionLimit() {
+ Schema schema = Schema.create(Schema.Type.BYTES);
+ // precision is not limited for bytes
+ LogicalTypes.decimal(Integer.MAX_VALUE).addToSchema(schema);
+ Assert.assertEquals("Precision should be an Integer.MAX_VALUE",
+ Integer.MAX_VALUE,
+ ((LogicalTypes.Decimal)
LogicalTypes.fromSchemaIgnoreInvalid(schema)).getPrecision());
+ }
+
+ @Test
+ public void testDecimalFixedPrecisionLimit() {
+ // 4 bytes can hold up to 9 digits of precision
+ final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+ assertThrows("Should reject precision", IllegalArgumentException.class,
+ "fixed(4) cannot store 10 digits (max 9)", new Callable() {
+ @Override
+ public Object call() throws Exception {
+ LogicalTypes.decimal(10).addToSchema(schema);
+ return null;
+ }
+ }
+ );
+ Assert.assertNull("Invalid logical type should not be set on schema",
+ LogicalTypes.fromSchemaIgnoreInvalid(schema));
+ }
+
+ @Test
+ public void testDecimalFailsWithZeroPrecision() {
+ final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+ assertThrows("Should reject precision", IllegalArgumentException.class,
+ "Invalid decimal precision: 0 (must be positive)", new Callable() {
+ @Override
+ public Object call() throws Exception {
+ LogicalTypes.decimal(0).addToSchema(schema);
+ return null;
+ }
+ });
+ Assert.assertNull("Invalid logical type should not be set on schema",
+ LogicalTypes.fromSchemaIgnoreInvalid(schema));
+ }
+
+ @Test
+ public void testDecimalFailsWithNegativePrecision() {
+ final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+ assertThrows("Should reject precision", IllegalArgumentException.class,
+ "Invalid decimal precision: -9 (must be positive)", new Callable() {
+ @Override
+ public Object call() throws Exception {
+ LogicalTypes.decimal(-9).addToSchema(schema);
+ return null;
+ }
+ });
+ Assert.assertNull("Invalid logical type should not be set on schema",
+ LogicalTypes.fromSchemaIgnoreInvalid(schema));
+ }
+
+ @Test
+ public void testDecimalScaleBoundedByPrecision() {
+ final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+ assertThrows("Should reject precision", IllegalArgumentException.class,
+ "Invalid decimal scale: 10 (greater than precision: 9)",
+ new Callable() {
+ @Override
+ public Object call() throws Exception {
+ LogicalTypes.decimal(9, 10).addToSchema(schema);
+ return null;
+ }
+ });
+ Assert.assertNull("Invalid logical type should not be set on schema",
+ LogicalTypes.fromSchemaIgnoreInvalid(schema));
+ }
+
+ @Test
+ public void testDecimalFailsWithNegativeScale() {
+ final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+ assertThrows("Should reject precision", IllegalArgumentException.class,
+ "Invalid decimal scale: -2 (must be positive)", new Callable() {
+ @Override
+ public Object call() throws Exception {
+ LogicalTypes.decimal(9, -2).addToSchema(schema);
+ return null;
+ }
+ });
+ Assert.assertNull("Invalid logical type should not be set on schema",
+ LogicalTypes.fromSchemaIgnoreInvalid(schema));
+ }
+
+ @Test
+ public void testSchemaRejectsSecondLogicalType() {
+ final Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+ LogicalTypes.decimal(9).addToSchema(schema);
+ assertThrows("Should reject second logical type",
+ AvroRuntimeException.class,
+ "Can't overwrite property: scale", new Callable() {
+ @Override
+ public Object call() throws Exception {
+ LogicalTypes.decimal(9, 2).addToSchema(schema);
+ return null;
+ }
+ }
+ );
+ Assert.assertEquals("First logical type should still be set on schema",
+ LogicalTypes.decimal(9), LogicalTypes.fromSchemaIgnoreInvalid(schema));
+ }
+
+ @Test
+ public void testDecimalDefaultScale() {
+ Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+ // 4 bytes can hold up to 9 digits of precision
+ LogicalTypes.decimal(9).addToSchema(schema);
+ Assert.assertEquals("Scale should be a 0",
+ 0,
+ ((LogicalTypes.Decimal)
LogicalTypes.fromSchemaIgnoreInvalid(schema)).getScale());
+ }
+
+ @Test
+ public void testFixedDecimalToFromJson() {
+ Schema schema = Schema.createFixed("aDecimal", null, null, 4);
+ LogicalTypes.decimal(9, 2).addToSchema(schema);
+ Schema parsed = new Schema.Parser().parse(schema.toString(true));
+ Assert.assertEquals("Constructed and parsed schemas should match",
+ schema, parsed);
+ }
+
+ @Test
+ public void testBytesDecimalToFromJson() {
+ Schema schema = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(9, 2).addToSchema(schema);
+ Schema parsed = new Schema.Parser().parse(schema.toString(true));
+ Assert.assertEquals("Constructed and parsed schemas should match",
+ schema, parsed);
+ }
+
+ @Test
+ public void testLogicalTypeEquals() {
+ LogicalTypes.Decimal decimal90 = LogicalTypes.decimal(9);
+ LogicalTypes.Decimal decimal80 = LogicalTypes.decimal(8);
+ LogicalTypes.Decimal decimal92 = LogicalTypes.decimal(9, 2);
+
+ assertEqualsTrue("Same decimal", LogicalTypes.decimal(9, 0), decimal90);
+ assertEqualsTrue("Same decimal", LogicalTypes.decimal(8, 0), decimal80);
+ assertEqualsTrue("Same decimal", LogicalTypes.decimal(9, 2), decimal92);
+ assertEqualsFalse("Different logical type", LogicalTypes.uuid(),
decimal90);
+ assertEqualsFalse("Different precision", decimal90, decimal80);
+ assertEqualsFalse("Different scale", decimal90, decimal92);
+ }
+
+ @Test
+ public void testLogicalTypeInSchemaEquals() {
+ Schema schema1 = Schema.createFixed("aDecimal", null, null, 4);
+ Schema schema2 = Schema.createFixed("aDecimal", null, null, 4);
+ Schema schema3 = Schema.createFixed("aDecimal", null, null, 4);
+ Assert.assertNotSame(schema1, schema2);
+ Assert.assertNotSame(schema1, schema3);
+ assertEqualsTrue("No logical types", schema1, schema2);
+ assertEqualsTrue("No logical types", schema1, schema3);
+
+ LogicalTypes.decimal(9).addToSchema(schema1);
+ assertEqualsFalse("Two has no logical type", schema1, schema2);
+
+ LogicalTypes.decimal(9).addToSchema(schema2);
+ assertEqualsTrue("Same logical types", schema1, schema2);
+
+ LogicalTypes.decimal(9, 2).addToSchema(schema3);
+ assertEqualsFalse("Different logical type", schema1, schema3);
+ }
+
+ public static void assertEqualsTrue(String message, Object o1, Object o2) {
+ Assert.assertTrue("Should be equal (forward): " + message, o1.equals(o2));
+ Assert.assertTrue("Should be equal (reverse): " + message, o2.equals(o1));
+ }
+
+ public static void assertEqualsFalse(String message, Object o1, Object o2) {
+ Assert.assertFalse("Should be equal (forward): " + message, o1.equals(o2));
+ Assert.assertFalse("Should be equal (reverse): " + message, o2.equals(o1));
+ }
+
+ /**
+ * A convenience method to avoid a large number of @Test(expected=...) tests
+ * @param message A String message to describe this assertion
+ * @param expected An Exception class that the Runnable should throw
+ * @param containedInMessage A String that should be contained by the thrown
+ * exception's message
+ * @param callable A Callable that is expected to throw the exception
+ */
+ public static void assertThrows(String message,
+ Class<? extends Exception> expected,
+ String containedInMessage,
+ Callable callable) {
+ try {
+ callable.call();
+ Assert.fail("No exception was thrown (" + message + "), expected: " +
+ expected.getName());
+ } catch (Exception actual) {
+ Assert.assertEquals(message, expected, actual.getClass());
+ Assert.assertTrue(
+ "Expected exception message (" + containedInMessage + ") missing: " +
+ actual.getMessage(),
+ actual.getMessage().contains(containedInMessage)
+ );
+ }
+ }
+}
Added:
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
URL:
http://svn.apache.org/viewvc/avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java?rev=1681906&view=auto
==============================================================================
---
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
(added)
+++
avro/trunk/lang/java/avro/src/test/java/org/apache/avro/generic/TestGenericLogicalTypes.java
Wed May 27 04:48:58 2015
@@ -0,0 +1,217 @@
+package org.apache.avro.generic;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.avro.Conversion;
+import org.apache.avro.Conversions;
+import org.apache.avro.LogicalType;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.file.FileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+public class TestGenericLogicalTypes {
+
+ @Rule
+ public TemporaryFolder temp = new TemporaryFolder();
+
+ public static final GenericData GENERIC = new GenericData();
+
+ @BeforeClass
+ public static void addDecimalAndUUID() {
+ GENERIC.addLogicalTypeConversion(new Conversions.DecimalConversion());
+ GENERIC.addLogicalTypeConversion(new Conversions.UUIDConversion());
+ }
+
+ @Test
+ public void testReadUUID() throws IOException {
+ Schema uuidSchema = Schema.create(Schema.Type.STRING);
+ LogicalTypes.uuid().addToSchema(uuidSchema);
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+ List<UUID> expected = Arrays.asList(u1, u2);
+
+ File test = write(Schema.create(Schema.Type.STRING),
+ u1.toString(), u2.toString());
+ Assert.assertEquals("Should convert Strings to UUIDs",
+ expected, read(GENERIC.createDatumReader(uuidSchema), test));
+ }
+
+ @Test
+ public void testWriteUUID() throws IOException {
+ Schema stringSchema = Schema.create(Schema.Type.STRING);
+ stringSchema.addProp(GenericData.STRING_PROP, "String");
+ Schema uuidSchema = Schema.create(Schema.Type.STRING);
+ LogicalTypes.uuid().addToSchema(uuidSchema);
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+ List<String> expected = Arrays.asList(u1.toString(), u2.toString());
+
+ File test = write(GENERIC, uuidSchema, u1, u2);
+ Assert.assertEquals("Should read UUIDs as Strings",
+ expected, read(GenericData.get().createDatumReader(stringSchema),
test));
+ }
+
+ @Test
+ public void testWriteNullableUUID() throws IOException {
+ Schema stringSchema = Schema.create(Schema.Type.STRING);
+ stringSchema.addProp(GenericData.STRING_PROP, "String");
+ Schema nullableStringSchema = Schema.createUnion(
+ Schema.create(Schema.Type.NULL), stringSchema);
+
+ Schema uuidSchema = Schema.create(Schema.Type.STRING);
+ LogicalTypes.uuid().addToSchema(uuidSchema);
+ Schema nullableUuidSchema = Schema.createUnion(
+ Schema.create(Schema.Type.NULL), uuidSchema);
+
+ UUID u1 = UUID.randomUUID();
+ UUID u2 = UUID.randomUUID();
+ List<String> expected = Arrays.asList(u1.toString(), u2.toString());
+
+ File test = write(GENERIC, nullableUuidSchema, u1, u2);
+ Assert.assertEquals("Should read UUIDs as Strings",
+ expected,
+ read(GenericData.get().createDatumReader(nullableStringSchema), test));
+ }
+
+ @Test
+ public void testReadDecimalFixed() throws IOException {
+ LogicalType decimal = LogicalTypes.decimal(9, 2);
+ Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+ Schema decimalSchema = decimal.addToSchema(
+ Schema.createFixed("aFixed", null, null, 4));
+
+ BigDecimal d1 = new BigDecimal("-34.34");
+ BigDecimal d2 = new BigDecimal("117230.00");
+ List<BigDecimal> expected = Arrays.asList(d1, d2);
+
+ Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+ // use the conversion directly instead of relying on the write side
+ GenericFixed d1fixed = conversion.toFixed(d1, fixedSchema, decimal);
+ GenericFixed d2fixed = conversion.toFixed(d2, fixedSchema, decimal);
+
+ File test = write(fixedSchema, d1fixed, d2fixed);
+ Assert.assertEquals("Should convert fixed to BigDecimals",
+ expected, read(GENERIC.createDatumReader(decimalSchema), test));
+ }
+
+ @Test
+ public void testWriteDecimalFixed() throws IOException {
+ LogicalType decimal = LogicalTypes.decimal(9, 2);
+ Schema fixedSchema = Schema.createFixed("aFixed", null, null, 4);
+ Schema decimalSchema = decimal.addToSchema(
+ Schema.createFixed("aFixed", null, null, 4));
+
+ BigDecimal d1 = new BigDecimal("-34.34");
+ BigDecimal d2 = new BigDecimal("117230.00");
+
+ Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+ GenericFixed d1fixed = conversion.toFixed(d1, fixedSchema, decimal);
+ GenericFixed d2fixed = conversion.toFixed(d2, fixedSchema, decimal);
+ List<GenericFixed> expected = Arrays.asList(d1fixed, d2fixed);
+
+ File test = write(GENERIC, decimalSchema, d1, d2);
+ Assert.assertEquals("Should read BigDecimals as fixed",
+ expected, read(GenericData.get().createDatumReader(fixedSchema),
test));
+ }
+
+ @Test
+ public void testReadDecimalBytes() throws IOException {
+ LogicalType decimal = LogicalTypes.decimal(9, 2);
+ Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+ Schema decimalSchema =
decimal.addToSchema(Schema.create(Schema.Type.BYTES));
+
+ BigDecimal d1 = new BigDecimal("-34.34");
+ BigDecimal d2 = new BigDecimal("117230.00");
+ List<BigDecimal> expected = Arrays.asList(d1, d2);
+
+ Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+ // use the conversion directly instead of relying on the write side
+ ByteBuffer d1bytes = conversion.toBytes(d1, bytesSchema, decimal);
+ ByteBuffer d2bytes = conversion.toBytes(d2, bytesSchema, decimal);
+
+ File test = write(bytesSchema, d1bytes, d2bytes);
+ Assert.assertEquals("Should convert bytes to BigDecimals",
+ expected, read(GENERIC.createDatumReader(decimalSchema), test));
+ }
+
+ @Test
+ public void testWriteDecimalBytes() throws IOException {
+ LogicalType decimal = LogicalTypes.decimal(9, 2);
+ Schema bytesSchema = Schema.create(Schema.Type.BYTES);
+ Schema decimalSchema =
decimal.addToSchema(Schema.create(Schema.Type.BYTES));
+
+ BigDecimal d1 = new BigDecimal("-34.34");
+ BigDecimal d2 = new BigDecimal("117230.00");
+
+ Conversion<BigDecimal> conversion = new Conversions.DecimalConversion();
+
+ // use the conversion directly instead of relying on the write side
+ ByteBuffer d1bytes = conversion.toBytes(d1, bytesSchema, decimal);
+ ByteBuffer d2bytes = conversion.toBytes(d2, bytesSchema, decimal);
+ List<ByteBuffer> expected = Arrays.asList(d1bytes, d2bytes);
+
+ File test = write(GENERIC, decimalSchema, d1bytes, d2bytes);
+ Assert.assertEquals("Should read BigDecimals as bytes",
+ expected, read(GenericData.get().createDatumReader(bytesSchema),
test));
+ }
+
+ private <D> List<D> read(DatumReader<D> reader, File file) throws
IOException {
+ List<D> data = new ArrayList<D>();
+ FileReader<D> fileReader = null;
+
+ try {
+ fileReader = new DataFileReader<D>(file, reader);
+ for (D datum : fileReader) {
+ data.add(datum);
+ }
+ } finally {
+ if (fileReader != null) {
+ fileReader.close();
+ }
+ }
+
+ return data;
+ }
+
+ private <D> File write(Schema schema, D... data) throws IOException {
+ return write(GenericData.get(), schema, data);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <D> File write(GenericData model, Schema schema, D... data) throws
IOException {
+ File file = temp.newFile();
+ DatumWriter<D> writer = model.createDatumWriter(schema);
+ DataFileWriter<D> fileWriter = new DataFileWriter<D>(writer);
+
+ try {
+ fileWriter.create(schema, file);
+ for (D datum : data) {
+ fileWriter.append(datum);
+ }
+ } finally {
+ fileWriter.close();
+ }
+
+ return file;
+ }
+}