AVRO-1704: Java: Add support for single-message encoding.
Project: http://git-wip-us.apache.org/repos/asf/avro/repo Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/9d9d5e76 Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/9d9d5e76 Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/9d9d5e76 Branch: refs/heads/branch-1.8 Commit: 9d9d5e76c64f168b3d8c91e0afbdbf6dd87b9a7d Parents: 1fbf64b Author: Ryan Blue <[email protected]> Authored: Sun Jul 24 15:20:37 2016 -0700 Committer: Ryan Blue <[email protected]> Committed: Sat Nov 5 13:13:35 2016 -0700 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../apache/avro/message/BadHeaderException.java | 34 +++ .../avro/message/BinaryMessageDecoder.java | 190 +++++++++++++++ .../avro/message/BinaryMessageEncoder.java | 122 ++++++++++ .../org/apache/avro/message/MessageDecoder.java | 159 ++++++++++++ .../org/apache/avro/message/MessageEncoder.java | 50 ++++ .../avro/message/MissingSchemaException.java | 35 +++ .../apache/avro/message/RawMessageDecoder.java | 101 ++++++++ .../apache/avro/message/RawMessageEncoder.java | 129 ++++++++++ .../org/apache/avro/message/SchemaStore.java | 63 +++++ .../avro/util/ReusableByteArrayInputStream.java | 35 +++ .../util/ReusableByteBufferInputStream.java | 89 +++++++ .../avro/message/TestBinaryMessageEncoding.java | 241 +++++++++++++++++++ .../main/java/org/apache/avro/GuavaClasses.java | 2 + 14 files changed, 1252 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9fd0e06..1d8b11d 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,8 @@ Trunk (not yet released) NEW FEATURES + AVRO-1704: Java: Add support for single-message encoding. (blue) + OPTIMIZATIONS IMPROVEMENTS http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java b/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java new file mode 100644 index 0000000..38c0001 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/message/BadHeaderException.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.AvroRuntimeException; + +/** + * Exception thrown by a {@link MessageDecoder} when a message header is not + * recognized. + * <p> + * This usually indicates that the encoded bytes were not an Avro message. + */ +public class BadHeaderException extends AvroRuntimeException { + public BadHeaderException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java new file mode 100644 index 0000000..11a7336 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageDecoder.java @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import com.google.common.collect.MapMaker; +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; +import org.apache.avro.generic.GenericData; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Map; + +/** + * A {@link MessageDecoder} that reads a binary-encoded datum. This checks for + * the datum header and decodes the payload with the schema that corresponds to + * the 8-byte schema fingerprint. + * <p> + * Instances can decode message payloads for known {@link Schema schemas}, which + * are schemas added using {@link #addSchema(Schema)}, schemas resolved by the + * {@link SchemaStore} passed to the constructor, or the expected schema passed + * to the constructor. Messages encoded using an unknown schema will cause + * instances to throw a {@link MissingSchemaException}. + * <p> + * It is safe to continue using instances of this class after {@link #decode} + * throws {@link BadHeaderException} or {@link MissingSchemaException}. + * <p> + * This class is thread-safe. + */ +public class BinaryMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> { + + private static final ThreadLocal<byte[]> HEADER_BUFFER = + new ThreadLocal<byte[]>() { + @Override + protected byte[] initialValue() { + return new byte[10]; + } + }; + + private static final ThreadLocal<ByteBuffer> FP_BUFFER = + new ThreadLocal<ByteBuffer>() { + @Override + protected ByteBuffer initialValue() { + byte[] header = HEADER_BUFFER.get(); + return ByteBuffer.wrap(header).order(ByteOrder.LITTLE_ENDIAN); + } + }; + + private final GenericData model; + private final Schema readSchema; + private final SchemaStore resolver; + + private final Map<Long, RawMessageDecoder<D>> codecByFingerprint = + new MapMaker().makeMap(); + + /** + * Creates a new {@link BinaryMessageEncoder} that uses the given + * {@link GenericData data model} to construct datum instances described by + * the {@link Schema schema}. + * <p> + * The {@code readSchema} is as used the expected schema (read schema). Datum + * instances created by this class will are described by the expected schema. + * <p> + * The schema used to decode incoming buffers is determined by the schema + * fingerprint encoded in the message header. This class can decode messages + * that were encoded using the {@code readSchema} and other schemas that are + * added using {@link #addSchema(Schema)}. + * + * @param model the {@link GenericData data model} for datum instances + * @param readSchema the {@link Schema} used to construct datum instances + */ + public BinaryMessageDecoder(GenericData model, Schema readSchema) { + this(model, readSchema, null); + } + + /** + * Creates a new {@link BinaryMessageEncoder} that uses the given + * {@link GenericData data model} to construct datum instances described by + * the {@link Schema schema}. + * <p> + * The {@code readSchema} is used as the expected schema (read schema). Datum + * instances created by this class will are described by the expected schema. + * <p> + * The schema used to decode incoming buffers is determined by the schema + * fingerprint encoded in the message header. This class can decode messages + * that were encoded using the {@code readSchema}, other schemas that are + * added using {@link #addSchema(Schema)}, or schemas returned by the + * {@code resolver}. + * + * @param model the {@link GenericData data model} for datum instances + * @param readSchema the {@link Schema} used to construct datum instances + * @param resolver a {@link SchemaStore} used to find schemas by fingerprint + */ + public BinaryMessageDecoder(GenericData model, Schema readSchema, + SchemaStore resolver) { + this.model = model; + this.readSchema = readSchema; + this.resolver = resolver; + addSchema(readSchema); + } + + /** + * Adds a {@link Schema} that can be used to decode buffers. + * + * @param writeSchema a {@link Schema} to use when decoding buffers + */ + public void addSchema(Schema writeSchema) { + long fp = SchemaNormalization.parsingFingerprint64(writeSchema); + codecByFingerprint.put(fp, + new RawMessageDecoder<D>(model, writeSchema, readSchema)); + } + + private RawMessageDecoder<D> getDecoder(long fp) { + RawMessageDecoder<D> decoder = codecByFingerprint.get(fp); + if (decoder != null) { + return decoder; + } + + if (resolver != null) { + Schema writeSchema = resolver.findByFingerprint(fp); + if (writeSchema != null) { + addSchema(writeSchema); + return codecByFingerprint.get(fp); + } + } + + throw new MissingSchemaException( + "Cannot resolve schema for fingerprint: " + fp); + } + + @Override + public D decode(InputStream stream, D reuse) throws IOException { + byte[] header = HEADER_BUFFER.get(); + try { + if (!readFully(stream, header)) { + throw new BadHeaderException("Not enough header bytes"); + } + } catch (IOException e) { + throw new IOException("Failed to read header and fingerprint bytes", e); + } + + if (! (BinaryMessageEncoder.V1_HEADER[0] == header[0]) + && BinaryMessageEncoder.V1_HEADER[1] == header[1]) { + throw new BadHeaderException(String.format( + "Unrecognized header bytes: 0x%h%h", + header[0], header[1])); + } + + RawMessageDecoder<D> decoder = getDecoder(FP_BUFFER.get().getLong(2)); + + return decoder.decode(stream, reuse); + } + + /** + * Reads a buffer from a stream, making multiple read calls if necessary. + * + * @param stream an InputStream to read from + * @param bytes a buffer + * @return true if the buffer is complete, false otherwise (stream ended) + * @throws IOException + */ + private boolean readFully(InputStream stream, byte[] bytes) + throws IOException { + int pos = 0; + int bytesRead; + while ((bytes.length - pos) > 0 && + (bytesRead = stream.read(bytes, pos, bytes.length - pos)) > 0) { + pos += bytesRead; + } + return (pos == bytes.length); + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java new file mode 100644 index 0000000..3cf3d0c --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/message/BinaryMessageEncoder.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import com.google.common.primitives.Bytes; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; +import org.apache.avro.generic.GenericData; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.security.NoSuchAlgorithmException; + +/** + * A {@link MessageEncoder} that adds a header and 8-byte schema fingerprint to + * each datum encoded as binary. + * <p> + * This class is thread-safe. + */ +public class BinaryMessageEncoder<D> implements MessageEncoder<D> { + + static final byte[] V1_HEADER = new byte[] {(byte) 0xC3, (byte) 0x01}; + + private final RawMessageEncoder<D> writeCodec; + + /** + * Creates a new {@link BinaryMessageEncoder} that uses the given + * {@link GenericData data model} to deconstruct datum instances described by + * the {@link Schema schema}. + * <p> + * Buffers returned by {@link #encode(D)} are copied and will not be modified + * by future calls to {@code encode}. + * + * @param model the {@link GenericData data model} for datum instances + * @param schema the {@link Schema} for datum instances + */ + public BinaryMessageEncoder(GenericData model, Schema schema) { + this(model, schema, true); + } + + /** + * Creates a new {@link BinaryMessageEncoder} that uses the given + * {@link GenericData data model} to deconstruct datum instances described by + * the {@link Schema schema}. + * <p> + * If {@code shouldCopy} is true, then buffers returned by {@link #encode(D)} + * are copied and will not be modified by future calls to {@code encode}. + * <p> + * If {@code shouldCopy} is false, then buffers returned by {@code encode} + * wrap a thread-local buffer that can be reused by future calls to + * {@code encode}, but may not be. Callers should only set {@code shouldCopy} + * to false if the buffer will be copied before the current thread's next call + * to {@code encode}. + * + * @param model the {@link GenericData data model} for datum instances + * @param schema the {@link Schema} for datum instances + * @param shouldCopy whether to copy buffers before returning encoded results + */ + public BinaryMessageEncoder(GenericData model, Schema schema, + boolean shouldCopy) { + this.writeCodec = new V1MessageEncoder<D>(model, schema, shouldCopy); + } + + @Override + public ByteBuffer encode(D datum) throws IOException { + return writeCodec.encode(datum); + } + + @Override + public void encode(D datum, OutputStream stream) throws IOException { + writeCodec.encode(datum, stream); + } + + /** + * This is a RawDatumEncoder that adds the V1 header to the outgoing buffer. + * BinaryDatumEncoder wraps this class to avoid confusion over what it does. + * It should not have an "is a" relationship with RawDatumEncoder because it + * adds the extra bytes. + */ + private static class V1MessageEncoder<D> extends RawMessageEncoder<D> { + private final byte[] headerBytes; + + V1MessageEncoder(GenericData model, Schema schema, boolean shouldCopy) { + super(model, schema, shouldCopy); + this.headerBytes = getWriteHeader(schema); + } + + @Override + public void encode(D datum, OutputStream stream) throws IOException { + stream.write(headerBytes); + super.encode(datum, stream); + } + + private static byte[] getWriteHeader(Schema schema) { + try { + byte[] fp = SchemaNormalization + .parsingFingerprint("CRC-64-AVRO", schema); + return Bytes.concat(V1_HEADER, fp); + } catch (NoSuchAlgorithmException e) { + throw new AvroRuntimeException(e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java new file mode 100644 index 0000000..bc86d12 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/message/MessageDecoder.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.util.ReusableByteArrayInputStream; +import org.apache.avro.util.ReusableByteBufferInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +/** + * Deserializes a single datum from a ByteBuffer, byte array, or InputStream. + * @param <D> a datum class + */ +public interface MessageDecoder<D> { + + /** + * Deserialize a single datum from an InputStream. + * + * @param stream stream to read from + * @return a datum read from the stream + * @throws BadHeaderException If the payload's header is not recognized. + * @throws MissingSchemaException If the payload's schema cannot be found. + * @throws IOException + */ + D decode(InputStream stream) throws IOException; + + /** + * Deserialize a single datum from an InputStream. + * + * @param stream stream to read from + * @param reuse a datum instance to reuse, avoiding instantiation if possible + * @return a datum read from the stream + * @throws BadHeaderException If the payload's header is not recognized. + * @throws MissingSchemaException If the payload's schema cannot be found. + * @throws IOException + */ + D decode(InputStream stream, D reuse) throws IOException; + + /** + * Deserialize a single datum from a ByteBuffer. + * + * @param encoded a ByteBuffer containing an encoded datum + * @return a datum read from the stream + * @throws BadHeaderException If the payload's header is not recognized. + * @throws MissingSchemaException If the payload's schema cannot be found. + * @throws IOException + */ + D decode(ByteBuffer encoded) throws IOException; + + /** + * Deserialize a single datum from a ByteBuffer. + * + * @param encoded a ByteBuffer containing an encoded datum + * @param reuse a datum instance to reuse, avoiding instantiation if possible + * @return a datum read from the stream + * @throws BadHeaderException If the payload's header is not recognized. + * @throws MissingSchemaException If the payload's schema cannot be found. + * @throws IOException + */ + D decode(ByteBuffer encoded, D reuse) throws IOException; + + /** + * Deserialize a single datum from a byte array. + * + * @param encoded a byte array containing an encoded datum + * @return a datum read from the stream + * @throws BadHeaderException If the payload's header is not recognized. + * @throws MissingSchemaException If the payload's schema cannot be found. + * @throws IOException + */ + D decode(byte[] encoded) throws IOException; + + /** + * Deserialize a single datum from a byte array. + * + * @param encoded a byte array containing an encoded datum + * @param reuse a datum instance to reuse, avoiding instantiation if possible + * @return a datum read from the stream + * @throws BadHeaderException If the payload's header is not recognized. + * @throws MissingSchemaException If the payload's schema cannot be found. + * @throws IOException + */ + D decode(byte[] encoded, D reuse) throws IOException; + + /** + * Base class for {@link MessageEncoder} implementations that provides default + * implementations for most of the {@code DatumEncoder} API. + * <p> + * Implementations provided by this base class are thread-safe. + * + * @param <D> a datum class + */ + abstract class BaseDecoder<D> implements MessageDecoder<D> { + + private static final ThreadLocal<ReusableByteArrayInputStream> + BYTE_ARRAY_IN = new ThreadLocal<ReusableByteArrayInputStream>() { + @Override + protected ReusableByteArrayInputStream initialValue() { + return new ReusableByteArrayInputStream(); + } + }; + + private static final ThreadLocal<ReusableByteBufferInputStream> + BYTE_BUFFER_IN = new ThreadLocal<ReusableByteBufferInputStream>() { + @Override + protected ReusableByteBufferInputStream initialValue() { + return new ReusableByteBufferInputStream(); + } + }; + + @Override + public D decode(InputStream stream) throws IOException { + return decode(stream, null); + } + + @Override + public D decode(ByteBuffer encoded) throws IOException { + return decode(encoded, null); + } + + @Override + public D decode(byte[] encoded) throws IOException { + return decode(encoded, null); + } + + @Override + public D decode(ByteBuffer encoded, D reuse) throws IOException { + ReusableByteBufferInputStream in = BYTE_BUFFER_IN.get(); + in.setByteBuffer(encoded); + return decode(in, reuse); + } + + @Override + public D decode(byte[] encoded, D reuse) throws IOException { + ReusableByteArrayInputStream in = BYTE_ARRAY_IN.get(); + in.setByteArray(encoded, 0, encoded.length); + return decode(in, reuse); + } + + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java new file mode 100644 index 0000000..60bfb79 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/message/MessageEncoder.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * Serializes an individual datum as a ByteBuffer or to an OutputStream. + * @param <D> a datum class + */ +public interface MessageEncoder<D> { + + /** + * Serialize a single datum to a ByteBuffer. + * + * @param datum a datum + * @return a ByteBuffer containing the serialized datum + * @throws IOException + */ + ByteBuffer encode(D datum) throws IOException; + + /** + * Serialize a single datum to an OutputStream. + * + * @param datum a datum + * @param stream an OutputStream to serialize the datum to + * @throws IOException + */ + void encode(D datum, OutputStream stream) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java b/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java new file mode 100644 index 0000000..a3b89fd --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/message/MissingSchemaException.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.AvroRuntimeException; + +/** + * Exception thrown by a {@link MessageDecoder} when the message is encoded + * using an unknown {@link org.apache.avro.Schema}. + * <p> + * Using a {@link SchemaStore} to provide schemas to the decoder can avoid this + * problem. + */ +public class MissingSchemaException extends AvroRuntimeException { + public MissingSchemaException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java new file mode 100644 index 0000000..52a7c2e --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageDecoder.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import java.io.IOException; +import java.io.InputStream; + +/** + * A {@link MessageDecoder} that deserializes from raw datum bytes. + * <p> + * This class uses the schema passed to its constructor when decoding buffers. + * To decode buffers that have different schemas, use + * {@link BinaryMessageEncoder} and {@link BinaryMessageDecoder}. + * <p> + * This will not throw {@link BadHeaderException} because it expects no header, + * and will not throw {@link MissingSchemaException} because it always uses the + * read schema from its constructor. + * <p> + * This class is thread-safe. + */ +public class RawMessageDecoder<D> extends MessageDecoder.BaseDecoder<D> { + + private static final ThreadLocal<BinaryDecoder> DECODER = + new ThreadLocal<BinaryDecoder>(); + + private final Schema writeSchema; + private final Schema readSchema; + private final DatumReader<D> reader; + + /** + * Creates a new {@link RawMessageDecoder} that uses the given + * {@link GenericData data model} to construct datum instances described by + * the {@link Schema schema}. + * <p> + * The {@code schema} is used as both the expected schema (read schema) and + * for the schema of payloads that are decoded (written schema). + * + * @param model the {@link GenericData data model} for datum instances + * @param schema the {@link Schema} used to construct datum instances and to + * decode buffers. + */ + public RawMessageDecoder(GenericData model, Schema schema) { + this(model, schema, schema); + } + + /** + * Creates a new {@link RawMessageDecoder} that uses the given + * {@link GenericData data model} to construct datum instances described by + * the {@link Schema readSchema}. + * <p> + * The {@code readSchema} is used for the expected schema and the + * {@code writeSchema} is the schema used to decode buffers. The + * {@code writeSchema} must be the schema that was used to encode all buffers + * decoded by this class. + * + * @param model the {@link GenericData data model} for datum instances + * @param readSchema the {@link Schema} used to construct datum instances + * @param writeSchema the {@link Schema} used to decode buffers + */ + public RawMessageDecoder(GenericData model, Schema writeSchema, + Schema readSchema) { + this.writeSchema = writeSchema; + this.readSchema = readSchema; + this.reader = model.createDatumReader(this.writeSchema, this.readSchema); + } + + @Override + public D decode(InputStream stream, D reuse) { + BinaryDecoder decoder = DecoderFactory.get() + .directBinaryDecoder(stream, DECODER.get()); + DECODER.set(decoder); + try { + return reader.read(reuse, decoder); + } catch (IOException e) { + throw new AvroRuntimeException("Decoding datum failed", e); + } + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java new file mode 100644 index 0000000..07ed861 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/message/RawMessageEncoder.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.EncoderFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; + +/** + * A {@link MessageEncoder} that encodes only a datum's bytes, without additional + * information (such as a schema fingerprint). + * <p> + * This class is thread-safe. + */ +public class RawMessageEncoder<D> implements MessageEncoder<D> { + + private static final ThreadLocal<BufferOutputStream> TEMP = + new ThreadLocal<BufferOutputStream>() { + @Override + protected BufferOutputStream initialValue() { + return new BufferOutputStream(); + } + }; + + private static final ThreadLocal<BinaryEncoder> ENCODER = + new ThreadLocal<BinaryEncoder>(); + + private final Schema writeSchema; + private final boolean copyOutputBytes; + private final DatumWriter<D> writer; + + /** + * Creates a new {@link RawMessageEncoder} that uses the given + * {@link GenericData data model} to deconstruct datum instances described by + * the {@link Schema schema}. + * <p> + * Buffers returned by {@link #encode(D)} are copied and will not be modified + * by future calls to {@code encode}. + * + * @param model the {@link GenericData data model} for datum instances + * @param schema the {@link Schema} for datum instances + */ + public RawMessageEncoder(GenericData model, Schema schema) { + this(model, schema, true); + } + + /** + * Creates a new {@link RawMessageEncoder} that uses the given + * {@link GenericData data model} to deconstruct datum instances described by + * the {@link Schema schema}. + * <p> + * If {@code shouldCopy} is true, then buffers returned by {@link #encode(D)} + * are copied and will not be modified by future calls to {@code encode}. + * <p> + * If {@code shouldCopy} is false, then buffers returned by {@code encode} + * wrap a thread-local buffer that can be reused by future calls to + * {@code encode}, but may not be. Callers should only set {@code shouldCopy} + * to false if the buffer will be copied before the current thread's next call + * to {@code encode}. + * + * @param model the {@link GenericData data model} for datum instances + * @param schema the {@link Schema} for datum instances + * @param shouldCopy whether to copy buffers before returning encoded results + */ + public RawMessageEncoder(GenericData model, Schema schema, boolean shouldCopy) { + this.writeSchema = schema; + this.copyOutputBytes = shouldCopy; + this.writer = model.createDatumWriter(this.writeSchema); + } + + @Override + public ByteBuffer encode(D datum) throws IOException { + BufferOutputStream temp = TEMP.get(); + temp.reset(); + + encode(datum, temp); + + if (copyOutputBytes) { + return temp.toBufferWithCopy(); + } else { + return temp.toBufferWithoutCopy(); + } + } + + @Override + public void encode(D datum, OutputStream stream) throws IOException { + BinaryEncoder encoder = EncoderFactory.get() + .directBinaryEncoder(stream, ENCODER.get()); + ENCODER.set(encoder); + writer.write(datum, encoder); + encoder.flush(); + } + + private static class BufferOutputStream extends ByteArrayOutputStream { + BufferOutputStream() { + } + + ByteBuffer toBufferWithoutCopy() { + return ByteBuffer.wrap(buf, 0, count); + } + + ByteBuffer toBufferWithCopy() { + return ByteBuffer.wrap(toByteArray()); + } + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java b/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java new file mode 100644 index 0000000..6e89b52 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/message/SchemaStore.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import com.google.common.collect.MapMaker; +import org.apache.avro.Schema; +import org.apache.avro.SchemaNormalization; +import java.util.Map; + +/** + * Interface for classes that can provide avro schemas by fingerprint. + */ +public interface SchemaStore { + + /** + * Retrieves a fingerprint by its AVRO-CRC-64 fingerprint. + * @param fingerprint an AVRO-CRC-64 fingerprint long + * @return a Schema with the given fingerprint, or null + */ + Schema findByFingerprint(long fingerprint); + + /** + * A map-based cache of schemas by AVRO-CRC-64 fingerprint. + * <p> + * This class is thread-safe. + */ + class Cache implements SchemaStore { + private final Map<Long, Schema> schemas = new MapMaker().makeMap(); + + /** + * Adds a schema to this cache that can be retrieved using its AVRO-CRC-64 + * fingerprint. + * + * @param schema a {@link Schema} + */ + public void addSchema(Schema schema) { + long fp = SchemaNormalization.parsingFingerprint64(schema); + schemas.put(fp, schema); + } + + @Override + public Schema findByFingerprint(long fingerprint) { + return schemas.get(fingerprint); + } + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java new file mode 100644 index 0000000..6fd2ae4 --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteArrayInputStream.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.util; + +import java.io.ByteArrayInputStream; + +public class ReusableByteArrayInputStream extends ByteArrayInputStream { + public ReusableByteArrayInputStream() { + super(new byte[0]); + } + + public void setByteArray(byte[] buf, int offset, int length) { + this.buf = buf; + this.pos = offset; + this.count = Math.min(offset + length, buf.length); + this.mark = offset; + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java new file mode 100644 index 0000000..eff7fdc --- /dev/null +++ b/lang/java/avro/src/main/java/org/apache/avro/util/ReusableByteBufferInputStream.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.util; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class ReusableByteBufferInputStream extends InputStream { + + private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0); + + private ByteBuffer buffer = EMPTY_BUFFER; + private int mark = 0; + + public void setByteBuffer(ByteBuffer buf) { + // do not modify the buffer that is passed in + this.buffer = buf.duplicate(); + this.mark = buf.position(); + } + + @Override + public int read() throws IOException { + if (buffer.hasRemaining()) { + return buffer.get() & 0xff; + } else { + return -1; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (buffer.remaining() <= 0) { + return -1; + } + // allow IndexOutOfBoundsException to be thrown by ByteBuffer#get + int bytesToRead = Math.min(len, buffer.remaining()); + buffer.get(b, off, bytesToRead); + return bytesToRead; + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + // n may be negative and results in skipping 0 bytes, according to javadoc + return 0; + } + + // this catches n > Integer.MAX_VALUE + int bytesToSkip = n > buffer.remaining() ? buffer.remaining() : (int) n; + buffer.position(buffer.position() + bytesToSkip); + return bytesToSkip; + } + + @Override + public synchronized void mark(int readLimit) { + // readLimit is ignored. there is no requirement to implement readLimit, it + // is a way for implementations to avoid buffering too much. since all data + // for this stream is held in memory, this has no need for such a limit. + this.mark = buffer.position(); + } + + @Override + public synchronized void reset() throws IOException { + buffer.position(mark); + } + + @Override + public boolean markSupported() { + return true; + } +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java new file mode 100644 index 0000000..47656b8 --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/message/TestBinaryMessageEncoding.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.avro.message; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Ordering; +import com.google.common.collect.Sets; +import org.apache.avro.AvroRuntimeException; +import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericData.Record; +import org.apache.avro.generic.GenericRecordBuilder; +import org.junit.Assert; +import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.Set; + +public class TestBinaryMessageEncoding { + public static final Schema SCHEMA_V1 = SchemaBuilder.record("TestRecord") + .fields() + .requiredInt("id") + .optionalString("msg") + .endRecord(); + + public static final GenericRecordBuilder V1_BUILDER = + new GenericRecordBuilder(SCHEMA_V1); + + public static final List<Record> V1_RECORDS = Arrays.asList( + V1_BUILDER.set("id", 1).set("msg", "m-1").build(), + V1_BUILDER.set("id", 2).set("msg", "m-2").build(), + V1_BUILDER.set("id", 4).set("msg", "m-4").build(), + V1_BUILDER.set("id", 6).set("msg", "m-6").build() + ); + + public static final Schema SCHEMA_V2 = SchemaBuilder.record("TestRecord") + .fields() + .requiredLong("id") + .name("message").aliases("msg").type().optional().stringType() + .optionalDouble("data") + .endRecord(); + + public static final GenericRecordBuilder V2_BUILDER = + new GenericRecordBuilder(SCHEMA_V2); + + public static final List<Record> V2_RECORDS = Arrays.asList( + V2_BUILDER.set("id", 3L).set("message", "m-3").set("data", 12.3).build(), + V2_BUILDER.set("id", 5L).set("message", "m-5").set("data", 23.4).build(), + V2_BUILDER.set("id", 7L).set("message", "m-7").set("data", 34.5).build(), + V2_BUILDER.set("id", 8L).set("message", "m-8").set("data", 35.6).build() + ); + + @Test + public void testByteBufferRoundTrip() throws Exception { + MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V2); + MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>( + GenericData.get(), SCHEMA_V2); + + Record copy = decoder.decode(encoder.encode(V2_RECORDS.get(0))); + + Assert.assertTrue("Copy should not be the same object", + copy != V2_RECORDS.get(0)); + Assert.assertEquals("Record should be identical after round-trip", + V2_RECORDS.get(0), copy); + } + + @Test + public void testSchemaEvolution() throws Exception { + List<ByteBuffer> buffers = Lists.newArrayList(); + List<Record> records = Ordering.usingToString().sortedCopy( + Iterables.concat(V1_RECORDS, V2_RECORDS)); + + MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V1); + MessageEncoder<Record> v2Encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V2); + + for (Record record : records) { + if (record.getSchema() == SCHEMA_V1) { + buffers.add(v1Encoder.encode(record)); + } else { + buffers.add(v2Encoder.encode(record)); + } + } + + Set<Record> allAsV2 = Sets.newHashSet(V2_RECORDS); + allAsV2.add( + V2_BUILDER.set("id", 1L).set("message", "m-1").clear("data").build()); + allAsV2.add( + V2_BUILDER.set("id", 2L).set("message", "m-2").clear("data").build()); + allAsV2.add( + V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build()); + allAsV2.add( + V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build()); + + BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>( + GenericData.get(), SCHEMA_V2); + v2Decoder.addSchema(SCHEMA_V1); + + Set<Record> decodedUsingV2 = Sets.newHashSet(); + for (ByteBuffer buffer : buffers) { + decodedUsingV2.add(v2Decoder.decode(buffer)); + } + + Assert.assertEquals(allAsV2, decodedUsingV2); + } + + @Test(expected = MissingSchemaException.class) + public void testCompatibleReadFailsWithoutSchema() throws Exception { + MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V1); + BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>( + GenericData.get(), SCHEMA_V2); + + ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3)); + + v2Decoder.decode(v1Buffer); + } + + @Test + public void testCompatibleReadWithSchema() throws Exception { + MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V1); + BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>( + GenericData.get(), SCHEMA_V2); + v2Decoder.addSchema(SCHEMA_V1); + + ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(3)); + + Record record = v2Decoder.decode(v1Buffer); + + Assert.assertEquals( + V2_BUILDER.set("id", 6L).set("message", "m-6").clear("data").build(), + record); + } + + @Test + public void testCompatibleReadWithSchemaFromLookup() throws Exception { + MessageEncoder<Record> v1Encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V1); + + SchemaStore.Cache schemaCache = new SchemaStore.Cache(); + schemaCache.addSchema(SCHEMA_V1); + BinaryMessageDecoder<Record> v2Decoder = new BinaryMessageDecoder<Record>( + GenericData.get(), SCHEMA_V2, schemaCache); + + ByteBuffer v1Buffer = v1Encoder.encode(V1_RECORDS.get(2)); + + Record record = v2Decoder.decode(v1Buffer); + + Assert.assertEquals( + V2_BUILDER.set("id", 4L).set("message", "m-4").clear("data").build(), + record); + } + + @Test + public void testBufferReuse() throws Exception { + // This test depends on the serialized version of record 1 being smaller or + // the same size as record 0 so that the reused ByteArrayOutputStream won't + // expand its internal buffer. + MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V1, false); + + ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0)); + ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1)); + + Assert.assertEquals(b0.array(), b1.array()); + + MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>( + GenericData.get(), SCHEMA_V1); + Assert.assertEquals("Buffer was reused, decode(b0) should be record 1", + V1_RECORDS.get(1), decoder.decode(b0)); + } + + @Test + public void testBufferCopy() throws Exception { + MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V1); + + ByteBuffer b0 = encoder.encode(V1_RECORDS.get(0)); + ByteBuffer b1 = encoder.encode(V1_RECORDS.get(1)); + + Assert.assertNotEquals(b0.array(), b1.array()); + + MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>( + GenericData.get(), SCHEMA_V1); + // bytes are not changed by reusing the encoder + Assert.assertEquals("Buffer was copied, decode(b0) should be record 0", + V1_RECORDS.get(0), decoder.decode(b0)); + } + + @Test(expected = AvroRuntimeException.class) + public void testByteBufferMissingPayload() throws Exception { + MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V2); + MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>( + GenericData.get(), SCHEMA_V2); + + ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0)); + + buffer.limit(12); + + decoder.decode(buffer); + } + + @Test(expected = BadHeaderException.class) + public void testByteBufferMissingFullHeader() throws Exception { + MessageEncoder<Record> encoder = new BinaryMessageEncoder<Record>( + GenericData.get(), SCHEMA_V2); + MessageDecoder<Record> decoder = new BinaryMessageDecoder<Record>( + GenericData.get(), SCHEMA_V2); + + ByteBuffer buffer = encoder.encode(V2_RECORDS.get(0)); + + buffer.limit(8); + + decoder.decode(buffer); + } + +} http://git-wip-us.apache.org/repos/asf/avro/blob/9d9d5e76/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java ---------------------------------------------------------------------- diff --git a/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java b/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java index fd77f2d..25d918f 100644 --- a/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java +++ b/lang/java/guava/src/main/java/org/apache/avro/GuavaClasses.java @@ -19,6 +19,7 @@ package org.apache.avro; import com.google.common.collect.MapMaker; +import com.google.common.primitives.Bytes; class GuavaClasses { /* @@ -27,5 +28,6 @@ class GuavaClasses { */ static { MapMaker.class.getName(); + Bytes.class.getName(); } }
