This is an automated email from the ASF dual-hosted git repository.

rskraba pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit e2e57aac2d03589c14a3298be95f78481d8b4d51
Author: Ryan Skraba <[email protected]>
AuthorDate: Sat Aug 19 08:02:14 2023 +0200

    AVRO-3819: Centralize system properties that limit allocations (#2432)
---
 .../avro/src/main/java/org/apache/avro/Schema.java |   3 +-
 .../java/org/apache/avro/SystemLimitException.java | 190 +++++++
 .../java/org/apache/avro/io/BinaryDecoder.java     |  74 +--
 .../org/apache/avro/io/DirectBinaryDecoder.java    |  24 +-
 .../src/main/java/org/apache/avro/util/Utf8.java   |  33 +-
 .../src/test/java/org/apache/avro/TestFixed.java   |  23 +-
 .../org/apache/avro/TestSystemLimitException.java  | 164 ++++++
 .../java/org/apache/avro/io/TestBinaryDecoder.java | 593 +++++++++++++--------
 .../test/java/org/apache/avro/util/TestUtf8.java   |  28 +-
 9 files changed, 811 insertions(+), 321 deletions(-)

diff --git a/lang/java/avro/src/main/java/org/apache/avro/Schema.java 
b/lang/java/avro/src/main/java/org/apache/avro/Schema.java
index 97513f52f..ad6bf0ca6 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/Schema.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/Schema.java
@@ -1287,8 +1287,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
 
     public FixedSchema(Name name, String doc, int size) {
       super(Type.FIXED, name, doc);
-      if (size < 0)
-        throw new IllegalArgumentException("Invalid fixed size: " + size);
+      SystemLimitException.checkMaxBytesLength(size);
       this.size = size;
     }
 
diff --git 
a/lang/java/avro/src/main/java/org/apache/avro/SystemLimitException.java 
b/lang/java/avro/src/main/java/org/apache/avro/SystemLimitException.java
new file mode 100644
index 000000000..a96f812d8
--- /dev/null
+++ b/lang/java/avro/src/main/java/org/apache/avro/SystemLimitException.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro;
+
+import org.slf4j.LoggerFactory;
+
+/**
+ * Thrown to prevent making large allocations when reading potentially
+ * pathological input data from an untrusted source.
+ * <p/>
+ * The following system properties can be set to limit the size of bytes,
+ * strings and collection types to be allocated:
+ * <ul>
+ * <li><tt>org.apache.avro.limits.byte.maxLength</tt></li> limits the maximum
+ * size of <tt>byte</tt> types.</li>
+ * <li><tt>org.apache.avro.limits.collectionItems.maxLength</tt></li> limits 
the
+ * maximum number of <tt>map</tt> and <tt>list</tt> items that can be read at
+ * once single sequence.</li>
+ * <li><tt>org.apache.avro.limits.string.maxLength</tt></li> limits the maximum
+ * size of <tt>string</tt> types.</li>
+ * </ul>
+ *
+ * The default is to permit sizes up to {@link #MAX_ARRAY_VM_LIMIT}.
+ */
+public class SystemLimitException extends AvroRuntimeException {
+
+  /**
+   * The maximum length of array to allocate (unless necessary). Some VMs 
reserve
+   * some header words in an array. Attempts to allocate larger arrays may 
result
+   * in {@code OutOfMemoryError: Requested array size exceeds VM limit}
+   *
+   * @see <a href="https://bugs.openjdk.org/browse/JDK-8246725";>JDK-8246725</a>
+   */
+  // VisibleForTesting
+  static final int MAX_ARRAY_VM_LIMIT = Integer.MAX_VALUE - 8;
+
+  public static final String MAX_BYTES_LENGTH_PROPERTY = 
"org.apache.avro.limits.bytes.maxLength";
+  public static final String MAX_COLLECTION_LENGTH_PROPERTY = 
"org.apache.avro.limits.collectionItems.maxLength";
+  public static final String MAX_STRING_LENGTH_PROPERTY = 
"org.apache.avro.limits.string.maxLength";
+
+  private static int maxBytesLength = MAX_ARRAY_VM_LIMIT;
+  private static int maxCollectionLength = MAX_ARRAY_VM_LIMIT;
+  private static int maxStringLength = MAX_ARRAY_VM_LIMIT;
+
+  static {
+    resetLimits();
+  }
+
+  public SystemLimitException(String message) {
+    super(message);
+  }
+
+  /**
+   * Get an integer value stored in a system property, used to configure the
+   * system behaviour of decoders
+   *
+   * @param property     The system property to fetch
+   * @param defaultValue The value to use if the system property is not 
present or
+   *                     parsable as an int
+   * @return The value from the system property
+   */
+  private static int getLimitFromProperty(String property, int defaultValue) {
+    String o = System.getProperty(property);
+    int i = defaultValue;
+    if (o != null) {
+      try {
+        i = Integer.parseUnsignedInt(o);
+      } catch (NumberFormatException nfe) {
+        LoggerFactory.getLogger(SystemLimitException.class).warn("Could not 
parse property " + property + ": " + o,
+            nfe);
+      }
+    }
+    return i;
+  }
+
+  /**
+   * Check to ensure that reading the bytes is within the specified limits.
+   *
+   * @param length The proposed size of the bytes to read
+   * @return The size of the bytes if and only if it is within the limit and
+   *         non-negative.
+   * @throws UnsupportedOperationException if reading the datum would allocate 
a
+   *                                       collection that the Java VM would be
+   *                                       unable to handle
+   * @throws SystemLimitException          if the decoding should fail because 
it
+   *                                       would otherwise result in an 
allocation
+   *                                       exceeding the set limit
+   * @throws AvroRuntimeException          if the length is negative
+   */
+  public static int checkMaxBytesLength(long length) {
+    if (length < 0) {
+      throw new AvroRuntimeException("Malformed data. Length is negative: " + 
length);
+    }
+    if (length > MAX_ARRAY_VM_LIMIT) {
+      throw new UnsupportedOperationException(
+          "Cannot read arrays longer than " + MAX_ARRAY_VM_LIMIT + " bytes in 
Java library");
+    }
+    if (length > maxBytesLength) {
+      throw new SystemLimitException("Bytes length " + length + " exceeds 
maximum allowed");
+    }
+    return (int) length;
+  }
+
+  /**
+   * Check to ensure that reading the specified number of items remains within 
the
+   * specified limits.
+   *
+   * @param existing The number of elements items read in the collection
+   * @param items    The next number of items to read. In normal usage, this is
+   *                 always a positive, permitted value. Negative and zero 
values
+   *                 have a special meaning in Avro decoding.
+   * @return The total number of items in the collection if and only if it is
+   *         within the limit and non-negative.
+   * @throws UnsupportedOperationException if reading the items would allocate 
a
+   *                                       collection that the Java VM would be
+   *                                       unable to handle
+   * @throws SystemLimitException          if the decoding should fail because 
it
+   *                                       would otherwise result in an 
allocation
+   *                                       exceeding the set limit
+   * @throws AvroRuntimeException          if the length is negative
+   */
+  public static int checkMaxCollectionLength(long existing, long items) {
+    long length = existing + items;
+    if (existing < 0) {
+      throw new AvroRuntimeException("Malformed data. Length is negative: " + 
existing);
+    }
+    if (items < 0) {
+      throw new AvroRuntimeException("Malformed data. Length is negative: " + 
items);
+    }
+    if (length > MAX_ARRAY_VM_LIMIT || length < existing) {
+      throw new UnsupportedOperationException(
+          "Cannot read collections larger than " + MAX_ARRAY_VM_LIMIT + " 
items in Java library");
+    }
+    if (length > maxCollectionLength) {
+      throw new SystemLimitException("Collection length " + length + " exceeds 
maximum allowed");
+    }
+    return (int) length;
+  }
+
+  /**
+   * Check to ensure that reading the string size is within the specified 
limits.
+   *
+   * @param length The proposed size of the string to read
+   * @return The size of the string if and only if it is within the limit and
+   *         non-negative.
+   * @throws UnsupportedOperationException if reading the items would allocate 
a
+   *                                       collection that the Java VM would be
+   *                                       unable to handle
+   * @throws SystemLimitException          if the decoding should fail because 
it
+   *                                       would otherwise result in an 
allocation
+   *                                       exceeding the set limit
+   * @throws AvroRuntimeException          if the length is negative
+   */
+  public static int checkMaxStringLength(long length) {
+    if (length < 0) {
+      throw new AvroRuntimeException("Malformed data. Length is negative: " + 
length);
+    }
+    if (length > MAX_ARRAY_VM_LIMIT) {
+      throw new UnsupportedOperationException("Cannot read strings longer than 
" + MAX_ARRAY_VM_LIMIT + " bytes");
+    }
+    if (length > maxStringLength) {
+      throw new SystemLimitException("String length " + length + " exceeds 
maximum allowed");
+    }
+    return (int) length;
+  }
+
+  /** Reread the limits from the system properties. */
+  // VisibleForTesting
+  static void resetLimits() {
+    maxBytesLength = getLimitFromProperty(MAX_BYTES_LENGTH_PROPERTY, 
MAX_ARRAY_VM_LIMIT);
+    maxCollectionLength = getLimitFromProperty(MAX_COLLECTION_LENGTH_PROPERTY, 
MAX_ARRAY_VM_LIMIT);
+    maxStringLength = getLimitFromProperty(MAX_STRING_LENGTH_PROPERTY, 
MAX_ARRAY_VM_LIMIT);
+  }
+}
diff --git a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java 
b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java
index 12c6e080d..3fa675d79 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/BinaryDecoder.java
@@ -26,8 +26,8 @@ import java.util.Arrays;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.InvalidNumberEncodingException;
+import org.apache.avro.SystemLimitException;
 import org.apache.avro.util.Utf8;
-import org.slf4j.LoggerFactory;
 
 /**
  * An {@link Decoder} for binary-format data.
@@ -39,27 +39,20 @@ import org.slf4j.LoggerFactory;
  * can be accessed by inputStream().remaining(), if the BinaryDecoder is not
  * 'direct'.
  * <p/>
- * To prevent this class from making large allocations when handling 
potentially
- * pathological input data, set Java properties
- * <tt>org.apache.avro.limits.string.maxLength</tt> and
- * <tt>org.apache.avro.limits.bytes.maxLength</tt> before instantiating this
- * class to limit the maximum sizes of <tt>string</tt> and <tt>bytes</tt> types
- * handled. The default is to permit sizes up to Java's maximum array length.
  *
  * @see Encoder
+ * @see SystemLimitException
  */
 
 public class BinaryDecoder extends Decoder {
 
   /**
-   * The maximum size of array to allocate. Some VMs reserve some header words 
in
-   * an array. Attempts to allocate larger arrays may result in 
OutOfMemoryError:
-   * Requested array size exceeds VM limit
+   * When reading a collection (MAP or ARRAY), this keeps track of the number 
of
+   * elements to ensure that the
+   * {@link SystemLimitException#checkMaxCollectionLength} constraint is
+   * respected.
    */
-  static final long MAX_ARRAY_SIZE = (long) Integer.MAX_VALUE - 8L;
-
-  private static final String MAX_BYTES_LENGTH_PROPERTY = 
"org.apache.avro.limits.bytes.maxLength";
-  protected final int maxBytesLength;
+  private long collectionCount = 0L;
 
   private ByteSource source = null;
   // we keep the buffer and its state variables in this class and not in a
@@ -99,17 +92,6 @@ public class BinaryDecoder extends Decoder {
   /** protected constructor for child classes */
   protected BinaryDecoder() {
     super();
-    String o = System.getProperty(MAX_BYTES_LENGTH_PROPERTY);
-    int i = Integer.MAX_VALUE;
-    if (o != null) {
-      try {
-        i = Integer.parseUnsignedInt(o);
-      } catch (NumberFormatException nfe) {
-        LoggerFactory.getLogger(BinaryDecoder.class)
-            .warn("Could not parse property " + MAX_BYTES_LENGTH_PROPERTY + ": 
" + o, nfe);
-      }
-    }
-    maxBytesLength = i;
   }
 
   BinaryDecoder(InputStream in, int bufferSize) {
@@ -300,17 +282,11 @@ public class BinaryDecoder extends Decoder {
 
   @Override
   public Utf8 readString(Utf8 old) throws IOException {
-    long length = readLong();
-    if (length > MAX_ARRAY_SIZE) {
-      throw new UnsupportedOperationException("Cannot read strings longer than 
" + MAX_ARRAY_SIZE + " bytes");
-    }
-    if (length < 0L) {
-      throw new AvroRuntimeException("Malformed data. Length is negative: " + 
length);
-    }
+    int length = SystemLimitException.checkMaxStringLength(readLong());
     Utf8 result = (old != null ? old : new Utf8());
-    result.setByteLength((int) length);
-    if (0L != length) {
-      doReadBytes(result.getBytes(), 0, (int) length);
+    result.setByteLength(length);
+    if (0 != length) {
+      doReadBytes(result.getBytes(), 0, length);
     }
     return result;
   }
@@ -329,17 +305,7 @@ public class BinaryDecoder extends Decoder {
 
   @Override
   public ByteBuffer readBytes(ByteBuffer old) throws IOException {
-    long length = readLong();
-    if (length > MAX_ARRAY_SIZE) {
-      throw new UnsupportedOperationException(
-          "Cannot read arrays longer than " + MAX_ARRAY_SIZE + " bytes in Java 
library");
-    }
-    if (length > maxBytesLength) {
-      throw new AvroRuntimeException("Bytes length " + length + " exceeds 
maximum allowed");
-    }
-    if (length < 0L) {
-      throw new AvroRuntimeException("Malformed data. Length is negative: " + 
length);
-    }
+    int length = SystemLimitException.checkMaxBytesLength(readLong());
     final ByteBuffer result;
     if (old != null && length <= old.capacity()) {
       result = old;
@@ -444,7 +410,6 @@ public class BinaryDecoder extends Decoder {
    * @return Zero if there are no more items to skip and end of array/map is
    *         reached. Positive number if some items are found that cannot be
    *         skipped and the client needs to skip them individually.
-   *
    * @throws IOException If the first byte cannot be read for any reason other
    *                     than the end of the file, if the input stream has been
    *                     closed, or if some other I/O error occurs.
@@ -461,12 +426,15 @@ public class BinaryDecoder extends Decoder {
 
   @Override
   public long readArrayStart() throws IOException {
-    return doReadItemCount();
+    collectionCount = SystemLimitException.checkMaxCollectionLength(0L, 
doReadItemCount());
+    return collectionCount;
   }
 
   @Override
   public long arrayNext() throws IOException {
-    return doReadItemCount();
+    long length = doReadItemCount();
+    collectionCount = 
SystemLimitException.checkMaxCollectionLength(collectionCount, length);
+    return length;
   }
 
   @Override
@@ -476,12 +444,15 @@ public class BinaryDecoder extends Decoder {
 
   @Override
   public long readMapStart() throws IOException {
-    return doReadItemCount();
+    collectionCount = SystemLimitException.checkMaxCollectionLength(0L, 
doReadItemCount());
+    return collectionCount;
   }
 
   @Override
   public long mapNext() throws IOException {
-    return doReadItemCount();
+    long length = doReadItemCount();
+    collectionCount = 
SystemLimitException.checkMaxCollectionLength(collectionCount, length);
+    return length;
   }
 
   @Override
@@ -933,7 +904,6 @@ public class BinaryDecoder extends Decoder {
   /**
    * This byte source is special. It will avoid copying data by using the 
source's
    * byte[] as a buffer in the decoder.
-   *
    */
   private static class ByteArrayByteSource extends ByteSource {
     private static final int MIN_SIZE = 16;
diff --git 
a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java 
b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java
index 1e01f6f96..6f07b13ee 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/io/DirectBinaryDecoder.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 
-import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.InvalidNumberEncodingException;
+import org.apache.avro.SystemLimitException;
 import org.apache.avro.util.ByteBufferInputStream;
 
 /**
@@ -39,8 +39,7 @@ class DirectBinaryDecoder extends BinaryDecoder {
   private InputStream in;
 
   private class ByteReader {
-    public ByteBuffer read(ByteBuffer old, long length) throws IOException {
-      this.checkLength(length);
+    public ByteBuffer read(ByteBuffer old, int length) throws IOException {
       final ByteBuffer result;
       if (old != null && length <= old.capacity()) {
         result = old;
@@ -52,19 +51,6 @@ class DirectBinaryDecoder extends BinaryDecoder {
       result.limit((int) length);
       return result;
     }
-
-    protected final void checkLength(long length) {
-      if (length < 0L) {
-        throw new AvroRuntimeException("Malformed data. Length is negative: " 
+ length);
-      }
-      if (length > MAX_ARRAY_SIZE) {
-        throw new UnsupportedOperationException(
-            "Cannot read arrays longer than " + MAX_ARRAY_SIZE + " bytes in 
Java library");
-      }
-      if (length > maxBytesLength) {
-        throw new AvroRuntimeException("Bytes length " + length + " exceeds 
maximum allowed");
-      }
-    }
   }
 
   private class ReuseByteReader extends ByteReader {
@@ -75,15 +61,13 @@ class DirectBinaryDecoder extends BinaryDecoder {
     }
 
     @Override
-    public ByteBuffer read(ByteBuffer old, long length) throws IOException {
-      this.checkLength(length);
+    public ByteBuffer read(ByteBuffer old, int length) throws IOException {
       if (old != null) {
         return super.read(old, length);
       } else {
         return bbi.readBuffer((int) length);
       }
     }
-
   }
 
   private ByteReader byteReader;
@@ -172,7 +156,7 @@ class DirectBinaryDecoder extends BinaryDecoder {
   @Override
   public ByteBuffer readBytes(ByteBuffer old) throws IOException {
     long length = readLong();
-    return byteReader.read(old, length);
+    return byteReader.read(old, 
SystemLimitException.checkMaxBytesLength(length));
   }
 
   @Override
diff --git a/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java 
b/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
index f54b6e206..9238fd78c 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/Utf8.java
@@ -24,9 +24,8 @@ import java.io.ObjectOutput;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
-import org.apache.avro.AvroRuntimeException;
+import org.apache.avro.SystemLimitException;
 import org.apache.avro.io.BinaryData;
-import org.slf4j.LoggerFactory;
 
 /**
  * A Utf8 string. Unlike {@link String}, instances are mutable. This is more
@@ -34,22 +33,8 @@ import org.slf4j.LoggerFactory;
  * as a single instance may be reused.
  */
 public class Utf8 implements Comparable<Utf8>, CharSequence, Externalizable {
-  private static final String MAX_LENGTH_PROPERTY = 
"org.apache.avro.limits.string.maxLength";
-  private static final int MAX_LENGTH;
-  private static final byte[] EMPTY = new byte[0];
 
-  static {
-    String o = System.getProperty(MAX_LENGTH_PROPERTY);
-    int i = Integer.MAX_VALUE;
-    if (o != null) {
-      try {
-        i = Integer.parseUnsignedInt(o);
-      } catch (NumberFormatException nfe) {
-        LoggerFactory.getLogger(Utf8.class).warn("Could not parse property " + 
MAX_LENGTH_PROPERTY + ": " + o, nfe);
-      }
-    }
-    MAX_LENGTH = i;
-  }
+  private static final byte[] EMPTY = new byte[0];
 
   private byte[] bytes;
   private int hash;
@@ -63,7 +48,7 @@ public class Utf8 implements Comparable<Utf8>, CharSequence, 
Externalizable {
   public Utf8(String string) {
     byte[] bytes = getBytesFor(string);
     int length = bytes.length;
-    checkLength(length);
+    SystemLimitException.checkMaxStringLength(length);
     this.bytes = bytes;
     this.length = length;
     this.string = string;
@@ -78,7 +63,7 @@ public class Utf8 implements Comparable<Utf8>, CharSequence, 
Externalizable {
 
   public Utf8(byte[] bytes) {
     int length = bytes.length;
-    checkLength(length);
+    SystemLimitException.checkMaxStringLength(length);
     this.bytes = bytes;
     this.length = length;
   }
@@ -121,7 +106,7 @@ public class Utf8 implements Comparable<Utf8>, 
CharSequence, Externalizable {
    * length does not change, as this also clears the cached String.
    */
   public Utf8 setByteLength(int newLength) {
-    checkLength(newLength);
+    SystemLimitException.checkMaxStringLength(newLength);
     if (this.bytes.length < newLength) {
       this.bytes = Arrays.copyOf(this.bytes, newLength);
     }
@@ -135,7 +120,7 @@ public class Utf8 implements Comparable<Utf8>, 
CharSequence, Externalizable {
   public Utf8 set(String string) {
     byte[] bytes = getBytesFor(string);
     int length = bytes.length;
-    checkLength(length);
+    SystemLimitException.checkMaxStringLength(length);
     this.bytes = bytes;
     this.length = length;
     this.string = string;
@@ -215,12 +200,6 @@ public class Utf8 implements Comparable<Utf8>, 
CharSequence, Externalizable {
     return toString().subSequence(start, end);
   }
 
-  private static void checkLength(int length) {
-    if (length > MAX_LENGTH) {
-      throw new AvroRuntimeException("String length " + length + " exceeds 
maximum allowed");
-    }
-  }
-
   /** Gets the UTF-8 bytes for a String */
   public static byte[] getBytesFor(String str) {
     return str.getBytes(StandardCharsets.UTF_8);
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestFixed.java 
b/lang/java/avro/src/test/java/org/apache/avro/TestFixed.java
index a9f78f168..f35c62d7a 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestFixed.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestFixed.java
@@ -18,19 +18,32 @@
 
 package org.apache.avro;
 
-import org.junit.Assert;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
 
 public class TestFixed {
 
   @Test
-  public void testFixedDefaultValueDrop() {
+  void fixedDefaultValueDrop() {
     Schema md5 = SchemaBuilder.builder().fixed("MD5").size(16);
     Schema frec = 
SchemaBuilder.builder().record("test").fields().name("hash").type(md5).withDefault(new
 byte[16])
         .endRecord();
     Schema.Field field = frec.getField("hash");
-    Assert.assertNotNull(field.defaultVal());
-    Assert.assertArrayEquals(new byte[16], (byte[]) field.defaultVal());
+    assertNotNull(field.defaultVal());
+    assertArrayEquals(new byte[16], (byte[]) field.defaultVal());
+  }
+
+  @Test
+  void fixedLengthOutOfLimit() {
+    Exception ex = assertThrows(UnsupportedOperationException.class,
+        () -> Schema.createFixed("oversize", "doc", "space", 
Integer.MAX_VALUE));
+    assertEquals(TestSystemLimitException.ERROR_VM_LIMIT_BYTES, 
ex.getMessage());
   }
 
+  @Test
+  void fixedNegativeLength() {
+    Exception ex = assertThrows(AvroRuntimeException.class, () -> 
Schema.createFixed("negative", "doc", "space", -1));
+    assertEquals(TestSystemLimitException.ERROR_NEGATIVE, ex.getMessage());
+  }
 }
diff --git 
a/lang/java/avro/src/test/java/org/apache/avro/TestSystemLimitException.java 
b/lang/java/avro/src/test/java/org/apache/avro/TestSystemLimitException.java
new file mode 100644
index 000000000..0da391795
--- /dev/null
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSystemLimitException.java
@@ -0,0 +1,164 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro;
+
+import static org.apache.avro.SystemLimitException.*;
+import static org.junit.jupiter.api.Assertions.*;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.function.Function;
+
+public class TestSystemLimitException {
+
+  /** Delegated here for package visibility. */
+  public static final int MAX_ARRAY_VM_LIMIT = 
SystemLimitException.MAX_ARRAY_VM_LIMIT;
+
+  public static final String ERROR_NEGATIVE = "Malformed data. Length is 
negative: -1";
+  public static final String ERROR_VM_LIMIT_BYTES = "Cannot read arrays longer 
than " + MAX_ARRAY_VM_LIMIT
+      + " bytes in Java library";
+  public static final String ERROR_VM_LIMIT_COLLECTION = "Cannot read 
collections larger than " + MAX_ARRAY_VM_LIMIT
+      + " items in Java library";
+  public static final String ERROR_VM_LIMIT_STRING = "Cannot read strings 
longer than " + MAX_ARRAY_VM_LIMIT + " bytes";
+
+  /** Delegated here for package visibility. */
+  public static void resetLimits() {
+    SystemLimitException.resetLimits();
+  }
+
+  @AfterEach
+  void reset() {
+    System.clearProperty(MAX_BYTES_LENGTH_PROPERTY);
+    System.clearProperty(MAX_COLLECTION_LENGTH_PROPERTY);
+    System.clearProperty(MAX_STRING_LENGTH_PROPERTY);
+    resetLimits();
+  }
+
+  /**
+   * A helper method that tests the consistent limit handling from system
+   * properties.
+   *
+   * @param f                The function to be tested.
+   * @param sysProperty      The system property used to control the custom 
limit.
+   * @param errorVmLimit     The error message used when the property would be
+   *                         over the VM limit.
+   * @param errorCustomLimit The error message used when the property would be
+   *                         over the custom limit of 1000.
+   */
+  void helpCheckSystemLimits(Function<Long, Integer> f, String sysProperty, 
String errorVmLimit,
+      String errorCustomLimit) {
+    // Correct values pass through
+    assertEquals(0, f.apply(0L));
+    assertEquals(1024, f.apply(1024L));
+    assertEquals(MAX_ARRAY_VM_LIMIT, f.apply((long) MAX_ARRAY_VM_LIMIT));
+
+    // Values that exceed the default system limits throw exceptions
+    Exception ex = assertThrows(UnsupportedOperationException.class, () -> 
f.apply(Long.MAX_VALUE));
+    assertEquals(errorVmLimit, ex.getMessage());
+    ex = assertThrows(UnsupportedOperationException.class, () -> 
f.apply((long) MAX_ARRAY_VM_LIMIT + 1));
+    assertEquals(errorVmLimit, ex.getMessage());
+    ex = assertThrows(AvroRuntimeException.class, () -> f.apply(-1L));
+    assertEquals(ERROR_NEGATIVE, ex.getMessage());
+
+    // Setting the system property to provide a custom limit.
+    System.setProperty(sysProperty, Long.toString(1000L));
+    resetLimits();
+
+    // Correct values pass through
+    assertEquals(0, f.apply(0L));
+    assertEquals(102, f.apply(102L));
+
+    // Values that exceed the custom system limits throw exceptions
+    ex = assertThrows(UnsupportedOperationException.class, () -> 
f.apply((long) MAX_ARRAY_VM_LIMIT + 1));
+    assertEquals(errorVmLimit, ex.getMessage());
+    ex = assertThrows(SystemLimitException.class, () -> f.apply(1024L));
+    assertEquals(errorCustomLimit, ex.getMessage());
+    ex = assertThrows(AvroRuntimeException.class, () -> f.apply(-1L));
+    assertEquals(ERROR_NEGATIVE, ex.getMessage());
+  }
+
+  @Test
+  void testCheckMaxBytesLength() {
+    helpCheckSystemLimits(SystemLimitException::checkMaxBytesLength, 
MAX_BYTES_LENGTH_PROPERTY, ERROR_VM_LIMIT_BYTES,
+        "Bytes length 1024 exceeds maximum allowed");
+  }
+
+  @Test
+  void testCheckMaxCollectionLengthFromZero() {
+    helpCheckSystemLimits(l -> checkMaxCollectionLength(0L, l), 
MAX_COLLECTION_LENGTH_PROPERTY,
+        ERROR_VM_LIMIT_COLLECTION, "Collection length 1024 exceeds maximum 
allowed");
+  }
+
+  @Test
+  void testCheckMaxStringLength() {
+    helpCheckSystemLimits(SystemLimitException::checkMaxStringLength, 
MAX_STRING_LENGTH_PROPERTY, ERROR_VM_LIMIT_STRING,
+        "String length 1024 exceeds maximum allowed");
+  }
+
+  @Test
+  void testCheckMaxCollectionLengthFromNonZero() {
+    // Correct values pass through
+    assertEquals(10, checkMaxCollectionLength(10L, 0L));
+    assertEquals(MAX_ARRAY_VM_LIMIT, checkMaxCollectionLength(10L, 
MAX_ARRAY_VM_LIMIT - 10L));
+    assertEquals(MAX_ARRAY_VM_LIMIT, 
checkMaxCollectionLength(MAX_ARRAY_VM_LIMIT - 10L, 10L));
+
+    // Values that exceed the default system limits throw exceptions
+    Exception ex = assertThrows(UnsupportedOperationException.class,
+        () -> checkMaxCollectionLength(10L, MAX_ARRAY_VM_LIMIT - 9L));
+    assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+    ex = assertThrows(UnsupportedOperationException.class,
+        () -> checkMaxCollectionLength(SystemLimitException.MAX_ARRAY_VM_LIMIT 
- 9L, 10L));
+    assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    ex = assertThrows(UnsupportedOperationException.class, () -> 
checkMaxCollectionLength(10L, Long.MAX_VALUE - 10L));
+    assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+    ex = assertThrows(UnsupportedOperationException.class, () -> 
checkMaxCollectionLength(Long.MAX_VALUE - 10L, 10L));
+    assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    // Overflow that adds to negative
+    ex = assertThrows(UnsupportedOperationException.class, () -> 
checkMaxCollectionLength(10L, Long.MAX_VALUE));
+    assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+    ex = assertThrows(UnsupportedOperationException.class, () -> 
checkMaxCollectionLength(Long.MAX_VALUE, 10L));
+    assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    ex = assertThrows(AvroRuntimeException.class, () -> 
checkMaxCollectionLength(10L, -1L));
+    assertEquals(ERROR_NEGATIVE, ex.getMessage());
+    ex = assertThrows(AvroRuntimeException.class, () -> 
checkMaxCollectionLength(-1L, 10L));
+    assertEquals(ERROR_NEGATIVE, ex.getMessage());
+
+    // Setting the system property to provide a custom limit.
+    System.setProperty(MAX_COLLECTION_LENGTH_PROPERTY, Long.toString(1000L));
+    resetLimits();
+
+    // Correct values pass through
+    assertEquals(10, checkMaxCollectionLength(10L, 0L));
+    assertEquals(102, checkMaxCollectionLength(10L, 92L));
+    assertEquals(102, checkMaxCollectionLength(92L, 10L));
+
+    // Values that exceed the custom system limits throw exceptions
+    ex = assertThrows(UnsupportedOperationException.class, () -> 
checkMaxCollectionLength(MAX_ARRAY_VM_LIMIT, 1));
+    assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+    ex = assertThrows(UnsupportedOperationException.class, () -> 
checkMaxCollectionLength(1, MAX_ARRAY_VM_LIMIT));
+    assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    ex = assertThrows(SystemLimitException.class, () -> 
checkMaxCollectionLength(999, 25));
+    assertEquals("Collection length 1024 exceeds maximum allowed", 
ex.getMessage());
+    ex = assertThrows(SystemLimitException.class, () -> 
checkMaxCollectionLength(25, 999));
+    assertEquals("Collection length 1024 exceeds maximum allowed", 
ex.getMessage());
+  }
+}
diff --git 
a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java 
b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java
index fe405cfb9..6010fc9c6 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/io/TestBinaryDecoder.java
@@ -17,56 +17,49 @@
  */
 package org.apache.avro.io;
 
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
+import org.apache.avro.SystemLimitException;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.avro.util.ByteBufferInputStream;
 import org.apache.avro.util.ByteBufferOutputStream;
 import org.apache.avro.util.RandomData;
 import org.apache.avro.util.Utf8;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(Parameterized.class)
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import static org.apache.avro.TestSystemLimitException.*;
+
 public class TestBinaryDecoder {
   // prime number buffer size so that looping tests hit the buffer edge
   // at different points in the loop.
   DecoderFactory factory = new 
DecoderFactory().configureDecoderBufferSize(521);
-  private boolean useDirect = false;
-  static EncoderFactory e_factory = EncoderFactory.get();
 
-  public TestBinaryDecoder(boolean useDirect) {
-    this.useDirect = useDirect;
-  }
-
-  @Parameters
-  public static Collection<Object[]> data() {
-    return Arrays.asList(new Object[][] { { true }, { false }, });
-  }
+  static EncoderFactory e_factory = EncoderFactory.get();
 
-  private Decoder newDecoderWithNoData() {
-    return newDecoder(new byte[0]);
+  private Decoder newDecoderWithNoData(boolean useDirect) {
+    return newDecoder(new byte[0], useDirect);
   }
 
-  private BinaryDecoder newDecoder(byte[] bytes, int start, int len) {
-    return this.newDecoder(bytes, start, len, null);
+  private BinaryDecoder newDecoder(byte[] bytes, int start, int len, boolean 
useDirect) {
+    return this.newDecoder(bytes, start, len, null, useDirect);
   }
 
-  private BinaryDecoder newDecoder(byte[] bytes, int start, int len, 
BinaryDecoder reuse) {
+  private BinaryDecoder newDecoder(byte[] bytes, int start, int len, 
BinaryDecoder reuse, boolean useDirect) {
     if (useDirect) {
       final ByteArrayInputStream input = new ByteArrayInputStream(bytes, 
start, len);
       return factory.directBinaryDecoder(input, reuse);
@@ -75,11 +68,11 @@ public class TestBinaryDecoder {
     }
   }
 
-  private BinaryDecoder newDecoder(InputStream in) {
-    return this.newDecoder(in, null);
+  private BinaryDecoder newDecoder(InputStream in, boolean useDirect) {
+    return this.newDecoder(in, null, useDirect);
   }
 
-  private BinaryDecoder newDecoder(InputStream in, BinaryDecoder reuse) {
+  private BinaryDecoder newDecoder(InputStream in, BinaryDecoder reuse, 
boolean useDirect) {
     if (useDirect) {
       return factory.directBinaryDecoder(in, reuse);
     } else {
@@ -87,67 +80,93 @@ public class TestBinaryDecoder {
     }
   }
 
-  private BinaryDecoder newDecoder(byte[] bytes, BinaryDecoder reuse) {
-    if (this.useDirect) {
+  private BinaryDecoder newDecoder(byte[] bytes, BinaryDecoder reuse, boolean 
useDirect) {
+    if (useDirect) {
       return this.factory.directBinaryDecoder(new ByteArrayInputStream(bytes), 
reuse);
     } else {
       return factory.binaryDecoder(bytes, reuse);
     }
   }
 
-  private BinaryDecoder newDecoder(byte[] bytes) {
-    return this.newDecoder(bytes, null);
+  private BinaryDecoder newDecoder(byte[] bytes, boolean useDirect) {
+    return this.newDecoder(bytes, null, useDirect);
+  }
+
+  /**
+   * Create a decoder for simulating reading corrupt, unexpected or 
out-of-bounds
+   * data.
+   *
+   * @return a {@link org.apache.avro.io.BinaryDecoder that has been 
initialized
+   *         on a byte array containing the sequence of encoded longs in order.
+   */
+  private BinaryDecoder newDecoder(boolean useDirect, long... values) throws 
IOException {
+    try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+      BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(baos, null);
+      for (long v : values)
+        encoder.writeLong(v);
+      encoder.flush();
+      return newDecoder(baos.toByteArray(), useDirect);
+    }
   }
 
   /** Verify EOFException throw at EOF */
 
-  @Test(expected = EOFException.class)
-  public void testEOFBoolean() throws IOException {
-    newDecoderWithNoData().readBoolean();
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eofBoolean(boolean useDirect) {
+    Assertions.assertThrows(EOFException.class, () -> 
newDecoderWithNoData(useDirect).readBoolean());
   }
 
-  @Test(expected = EOFException.class)
-  public void testEOFInt() throws IOException {
-    newDecoderWithNoData().readInt();
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eofInt(boolean useDirect) {
+    Assertions.assertThrows(EOFException.class, () -> 
newDecoderWithNoData(useDirect).readInt());
   }
 
-  @Test(expected = EOFException.class)
-  public void testEOFLong() throws IOException {
-    newDecoderWithNoData().readLong();
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eofLong(boolean useDirect) {
+    Assertions.assertThrows(EOFException.class, () -> 
newDecoderWithNoData(useDirect).readLong());
   }
 
-  @Test(expected = EOFException.class)
-  public void testEOFFloat() throws IOException {
-    newDecoderWithNoData().readFloat();
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eofFloat(boolean useDirect) {
+    Assertions.assertThrows(EOFException.class, () -> 
newDecoderWithNoData(useDirect).readFloat());
   }
 
-  @Test(expected = EOFException.class)
-  public void testEOFDouble() throws IOException {
-    newDecoderWithNoData().readDouble();
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eofDouble(boolean useDirect) {
+    Assertions.assertThrows(EOFException.class, () -> 
newDecoderWithNoData(useDirect).readDouble());
   }
 
-  @Test(expected = EOFException.class)
-  public void testEOFBytes() throws IOException {
-    newDecoderWithNoData().readBytes(null);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eofBytes(boolean useDirect) {
+    Assertions.assertThrows(EOFException.class, () -> 
newDecoderWithNoData(useDirect).readBytes(null));
   }
 
-  @Test(expected = EOFException.class)
-  public void testEOFString() throws IOException {
-    newDecoderWithNoData().readString(new Utf8("a"));
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eofString(boolean useDirect) {
+    Assertions.assertThrows(EOFException.class, () -> 
newDecoderWithNoData(useDirect).readString(new Utf8("a")));
   }
 
-  @Test(expected = EOFException.class)
-  public void testEOFFixed() throws IOException {
-    newDecoderWithNoData().readFixed(new byte[1]);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eofFixed(boolean useDirect) {
+    Assertions.assertThrows(EOFException.class, () -> 
newDecoderWithNoData(useDirect).readFixed(new byte[1]));
   }
 
-  @Test(expected = EOFException.class)
-  public void testEOFEnum() throws IOException {
-    newDecoderWithNoData().readEnum();
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eofEnum(boolean useDirect) {
+    Assertions.assertThrows(EOFException.class, () -> 
newDecoderWithNoData(useDirect).readEnum());
   }
 
   @Test
-  public void testReuse() throws IOException {
+  void reuse() throws IOException {
     ByteBufferOutputStream bbo1 = new ByteBufferOutputStream();
     ByteBufferOutputStream bbo2 = new ByteBufferOutputStream();
     byte[] b1 = new byte[] { 1, 2 };
@@ -162,11 +181,11 @@ public class TestBinaryDecoder {
 
     DirectBinaryDecoder d = new DirectBinaryDecoder(new 
ByteBufferInputStream(bbo1.getBufferList()));
     ByteBuffer bb1 = d.readBytes(null);
-    Assert.assertEquals(b1.length, bb1.limit() - bb1.position());
+    Assertions.assertEquals(b1.length, bb1.limit() - bb1.position());
 
     d.configure(new ByteBufferInputStream(bbo2.getBufferList()));
     ByteBuffer bb2 = d.readBytes(null);
-    Assert.assertEquals(b1.length, bb2.limit() - bb2.position());
+    Assertions.assertEquals(b1.length, bb2.limit() - bb2.position());
 
   }
 
@@ -175,7 +194,7 @@ public class TestBinaryDecoder {
   private static final int count = 200;
   private static final ArrayList<Object> records = new ArrayList<>(count);
 
-  @BeforeClass
+  @BeforeAll
   public static void generateData() throws IOException {
     int seed = (int) System.currentTimeMillis();
     // note some tests (testSkipping) rely on this explicitly
@@ -199,8 +218,9 @@ public class TestBinaryDecoder {
     data = baos.toByteArray();
   }
 
-  @Test
-  public void testDecodeFromSources() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void decodeFromSources(boolean useDirect) throws IOException {
     GenericDatumReader<Object> reader = new GenericDatumReader<>();
     reader.setSchema(schema);
 
@@ -208,81 +228,82 @@ public class TestBinaryDecoder {
     ByteArrayInputStream is2 = new ByteArrayInputStream(data);
     ByteArrayInputStream is3 = new ByteArrayInputStream(data);
 
-    Decoder fromInputStream = newDecoder(is);
-    Decoder fromArray = newDecoder(data);
+    Decoder fromInputStream = newDecoder(is, useDirect);
+    Decoder fromArray = newDecoder(data, useDirect);
 
     byte[] data2 = new byte[data.length + 30];
     Arrays.fill(data2, (byte) 0xff);
     System.arraycopy(data, 0, data2, 15, data.length);
 
-    Decoder fromOffsetArray = newDecoder(data2, 15, data.length);
+    Decoder fromOffsetArray = newDecoder(data2, 15, data.length, useDirect);
 
-    BinaryDecoder initOnInputStream = newDecoder(new byte[50], 0, 30);
-    initOnInputStream = newDecoder(is2, initOnInputStream);
-    BinaryDecoder initOnArray = this.newDecoder(is3, null);
-    initOnArray = this.newDecoder(data, initOnArray);
+    BinaryDecoder initOnInputStream = newDecoder(new byte[50], 0, 30, 
useDirect);
+    initOnInputStream = newDecoder(is2, initOnInputStream, useDirect);
+    BinaryDecoder initOnArray = this.newDecoder(is3, null, useDirect);
+    initOnArray = this.newDecoder(data, initOnArray, useDirect);
 
     for (Object datum : records) {
-      Assert.assertEquals("InputStream based BinaryDecoder result does not 
match", datum,
-          reader.read(null, fromInputStream));
-      Assert.assertEquals("Array based BinaryDecoder result does not match", 
datum, reader.read(null, fromArray));
-      Assert.assertEquals("offset Array based BinaryDecoder result does not 
match", datum,
-          reader.read(null, fromOffsetArray));
-      Assert.assertEquals("InputStream initialized BinaryDecoder result does 
not match", datum,
-          reader.read(null, initOnInputStream));
-      Assert.assertEquals("Array initialized BinaryDecoder result does not 
match", datum,
-          reader.read(null, initOnArray));
+      Assertions.assertEquals(datum, reader.read(null, fromInputStream),
+          "InputStream based BinaryDecoder result does not match");
+      Assertions.assertEquals(datum, reader.read(null, fromArray), "Array 
based BinaryDecoder result does not match");
+      Assertions.assertEquals(datum, reader.read(null, fromOffsetArray),
+          "offset Array based BinaryDecoder result does not match");
+      Assertions.assertEquals(datum, reader.read(null, initOnInputStream),
+          "InputStream initialized BinaryDecoder result does not match");
+      Assertions.assertEquals(datum, reader.read(null, initOnArray),
+          "Array initialized BinaryDecoder result does not match");
     }
   }
 
-  @Test
-  public void testInputStreamProxy() throws IOException {
-    BinaryDecoder d = newDecoder(data);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void inputStreamProxy(boolean useDirect) throws IOException {
+    BinaryDecoder d = newDecoder(data, useDirect);
     if (d != null) {
       BinaryDecoder bd = d;
       InputStream test = bd.inputStream();
       InputStream check = new ByteArrayInputStream(data);
       validateInputStreamReads(test, check);
-      bd = this.newDecoder(data, bd);
+      bd = this.newDecoder(data, bd, useDirect);
       test = bd.inputStream();
       check = new ByteArrayInputStream(data);
       validateInputStreamSkips(test, check);
       // with input stream sources
-      bd = newDecoder(new ByteArrayInputStream(data), bd);
+      bd = newDecoder(new ByteArrayInputStream(data), bd, useDirect);
       test = bd.inputStream();
       check = new ByteArrayInputStream(data);
       validateInputStreamReads(test, check);
-      bd = newDecoder(new ByteArrayInputStream(data), bd);
+      bd = newDecoder(new ByteArrayInputStream(data), bd, useDirect);
       test = bd.inputStream();
       check = new ByteArrayInputStream(data);
       validateInputStreamSkips(test, check);
     }
   }
 
-  @Test
-  public void testInputStreamProxyDetached() throws IOException {
-    Decoder d = newDecoder(data);
-    if (d instanceof BinaryDecoder) {
-      BinaryDecoder bd = (BinaryDecoder) d;
-      InputStream test = bd.inputStream();
-      InputStream check = new ByteArrayInputStream(data);
-      // detach input stream and decoder from old source
-      this.newDecoder(new byte[56]);
-      try (InputStream bad = bd.inputStream(); InputStream check2 = new 
ByteArrayInputStream(data)) {
-        validateInputStreamReads(test, check);
-        Assert.assertNotEquals(bad.read(), check2.read());
-      }
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void inputStreamProxyDetached(boolean useDirect) throws IOException {
+    BinaryDecoder bd = newDecoder(data, useDirect);
+
+    InputStream test = bd.inputStream();
+    InputStream check = new ByteArrayInputStream(data);
+    // detach input stream and decoder from old source
+    this.newDecoder(new byte[56], useDirect);
+    try (InputStream bad = bd.inputStream(); InputStream check2 = new 
ByteArrayInputStream(data)) {
+      validateInputStreamReads(test, check);
+      Assertions.assertNotEquals(bad.read(), check2.read());
     }
   }
 
-  @Test
-  public void testInputStreamPartiallyUsed() throws IOException {
-    BinaryDecoder bd = this.newDecoder(new ByteArrayInputStream(data));
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void inputStreamPartiallyUsed(boolean useDirect) throws IOException {
+    BinaryDecoder bd = this.newDecoder(new ByteArrayInputStream(data), 
useDirect);
     InputStream test = bd.inputStream();
     InputStream check = new ByteArrayInputStream(data);
     // triggers buffer fill if unused and tests isEnd()
     try {
-      Assert.assertFalse(bd.isEnd());
+      Assertions.assertFalse(bd.isEnd());
     } catch (UnsupportedOperationException e) {
       // this is ok if its a DirectBinaryDecoder.
       if (bd.getClass() != DirectBinaryDecoder.class) {
@@ -300,25 +321,28 @@ public class TestBinaryDecoder {
     while (true) {
       int t = test.read();
       int c = check.read();
-      Assert.assertEquals(c, t);
-      if (-1 == t)
+      Assertions.assertEquals(c, t);
+      if (-1 == t) {
         break;
+      }
       t = test.read(bt);
       c = check.read(bc);
-      Assert.assertEquals(c, t);
-      Assert.assertArrayEquals(bt, bc);
-      if (-1 == t)
+      Assertions.assertEquals(c, t);
+      Assertions.assertArrayEquals(bt, bc);
+      if (-1 == t) {
         break;
+      }
       t = test.read(bt, 1, 4);
       c = check.read(bc, 1, 4);
-      Assert.assertEquals(c, t);
-      Assert.assertArrayEquals(bt, bc);
-      if (-1 == t)
+      Assertions.assertEquals(c, t);
+      Assertions.assertArrayEquals(bt, bc);
+      if (-1 == t) {
         break;
+      }
     }
-    Assert.assertEquals(0, test.skip(5));
-    Assert.assertEquals(0, test.available());
-    Assert.assertFalse(test.getClass() != ByteArrayInputStream.class && 
test.markSupported());
+    Assertions.assertEquals(0, test.skip(5));
+    Assertions.assertEquals(0, test.available());
+    Assertions.assertFalse(test.getClass() != ByteArrayInputStream.class && 
test.markSupported());
     test.close();
   }
 
@@ -326,154 +350,300 @@ public class TestBinaryDecoder {
     while (true) {
       long t2 = test.skip(19);
       long c2 = check.skip(19);
-      Assert.assertEquals(c2, t2);
-      if (0 == t2)
+      Assertions.assertEquals(c2, t2);
+      if (0 == t2) {
         break;
+      }
     }
-    Assert.assertEquals(-1, test.read());
+    Assertions.assertEquals(-1, test.read());
   }
 
-  @Test
-  public void testBadIntEncoding() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void badIntEncoding(boolean useDirect) throws IOException {
     byte[] badint = new byte[5];
     Arrays.fill(badint, (byte) 0xff);
-    Decoder bd = this.newDecoder(badint);
+    Decoder bd = this.newDecoder(badint, useDirect);
     String message = "";
     try {
       bd.readInt();
     } catch (IOException ioe) {
       message = ioe.getMessage();
     }
-    Assert.assertEquals("Invalid int encoding", message);
+    Assertions.assertEquals("Invalid int encoding", message);
   }
 
-  @Test
-  public void testBadLongEncoding() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void badLongEncoding(boolean useDirect) throws IOException {
     byte[] badint = new byte[10];
     Arrays.fill(badint, (byte) 0xff);
-    Decoder bd = this.newDecoder(badint);
+    Decoder bd = this.newDecoder(badint, useDirect);
     String message = "";
     try {
       bd.readLong();
     } catch (IOException ioe) {
       message = ioe.getMessage();
     }
-    Assert.assertEquals("Invalid long encoding", message);
+    Assertions.assertEquals("Invalid long encoding", message);
   }
 
-  @Test
-  public void testNegativeStringLength() throws IOException {
-    byte[] bad = new byte[] { (byte) 1 };
-    Decoder bd = this.newDecoder(bad);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testStringNegativeLength(boolean useDirect) throws IOException {
+    Exception ex = Assertions.assertThrows(AvroRuntimeException.class, 
this.newDecoder(useDirect, -1L)::readString);
+    Assertions.assertEquals(ERROR_NEGATIVE, ex.getMessage());
+  }
 
-    Assert.assertThrows("Malformed data. Length is negative: -1", 
AvroRuntimeException.class, bd::readString);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testStringVmMaxSize(boolean useDirect) throws IOException {
+    Exception ex = Assertions.assertThrows(UnsupportedOperationException.class,
+        newDecoder(useDirect, MAX_ARRAY_VM_LIMIT + 1L)::readString);
+    Assertions.assertEquals(ERROR_VM_LIMIT_STRING, ex.getMessage());
   }
 
-  @Test
-  public void testStringMaxArraySize() throws IOException {
-    byte[] bad = new byte[10];
-    BinaryData.encodeLong(BinaryDecoder.MAX_ARRAY_SIZE + 1, bad, 0);
-    Decoder bd = this.newDecoder(bad);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testStringMaxCustom(boolean useDirect) throws IOException {
+    try {
+      System.setProperty(SystemLimitException.MAX_STRING_LENGTH_PROPERTY, 
Long.toString(128));
+      resetLimits();
+      Exception ex = Assertions.assertThrows(SystemLimitException.class, 
newDecoder(useDirect, 129)::readString);
+      Assertions.assertEquals("String length 129 exceeds maximum allowed", 
ex.getMessage());
+    } finally {
+      System.clearProperty(SystemLimitException.MAX_STRING_LENGTH_PROPERTY);
+      resetLimits();
+    }
+  }
 
-    Assert.assertThrows("Cannot read strings longer than " + 
BinaryDecoder.MAX_ARRAY_SIZE + " bytes",
-        UnsupportedOperationException.class, bd::readString);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testBytesNegativeLength(boolean useDirect) throws IOException {
+    Exception ex = Assertions.assertThrows(AvroRuntimeException.class,
+        () -> this.newDecoder(useDirect, -1).readBytes(null));
+    Assertions.assertEquals(ERROR_NEGATIVE, ex.getMessage());
   }
 
-  @Test
-  public void testNegativeBytesLength() throws IOException {
-    byte[] bad = new byte[] { (byte) 1 };
-    Decoder bd = this.newDecoder(bad);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testBytesVmMaxSize(boolean useDirect) throws IOException {
+    Exception ex = Assertions.assertThrows(UnsupportedOperationException.class,
+        () -> this.newDecoder(useDirect, MAX_ARRAY_VM_LIMIT + 
1).readBytes(null));
+    Assertions.assertEquals(ERROR_VM_LIMIT_BYTES, ex.getMessage());
+  }
 
-    Assert.assertThrows("Malformed data. Length is negative: -1", 
AvroRuntimeException.class, () -> bd.readBytes(null));
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testBytesMaxCustom(boolean useDirect) throws IOException {
+    try {
+      System.setProperty(SystemLimitException.MAX_BYTES_LENGTH_PROPERTY, 
Long.toString(128));
+      resetLimits();
+      Exception ex = Assertions.assertThrows(SystemLimitException.class,
+          () -> newDecoder(useDirect, 129).readBytes(null));
+      Assertions.assertEquals("Bytes length 129 exceeds maximum allowed", 
ex.getMessage());
+    } finally {
+      System.clearProperty(SystemLimitException.MAX_BYTES_LENGTH_PROPERTY);
+      resetLimits();
+    }
   }
 
-  @Test
-  public void testBytesMaxArraySize() {
-    byte[] bad = new byte[10];
-    BinaryData.encodeLong(BinaryDecoder.MAX_ARRAY_SIZE + 1, bad, 0);
-    Decoder bd = this.newDecoder(bad);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testArrayVmMaxSize(boolean useDirect) throws IOException {
+    // At start
+    Exception ex = Assertions.assertThrows(UnsupportedOperationException.class,
+        () -> this.newDecoder(useDirect, MAX_ARRAY_VM_LIMIT + 
1).readArrayStart());
+    Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    // Next
+    ex = Assertions.assertThrows(UnsupportedOperationException.class,
+        () -> this.newDecoder(useDirect, MAX_ARRAY_VM_LIMIT + 1).arrayNext());
+    Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    // An OK reads followed by an overflow
+    Decoder bd = newDecoder(useDirect, MAX_ARRAY_VM_LIMIT - 100, 
Long.MAX_VALUE);
+    Assertions.assertEquals(MAX_ARRAY_VM_LIMIT - 100, bd.readArrayStart());
+    ex = Assertions.assertThrows(UnsupportedOperationException.class, 
bd::arrayNext);
+    Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    // Two OK reads followed by going over the VM limit.
+    bd = newDecoder(useDirect, MAX_ARRAY_VM_LIMIT - 100, 100, 1);
+    Assertions.assertEquals(MAX_ARRAY_VM_LIMIT - 100, bd.readArrayStart());
+    Assertions.assertEquals(100, bd.arrayNext());
+    ex = Assertions.assertThrows(UnsupportedOperationException.class, 
bd::arrayNext);
+    Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    // Two OK reads followed by going over the VM limit, where negative 
numbers are
+    // followed by the byte length of the items. For testing, the 999 values 
are
+    // read but ignored.
+    bd = newDecoder(useDirect, 100 - MAX_ARRAY_VM_LIMIT, 999, -100, 999, 1);
+    Assertions.assertEquals(MAX_ARRAY_VM_LIMIT - 100, bd.readArrayStart());
+    Assertions.assertEquals(100, bd.arrayNext());
+    ex = Assertions.assertThrows(UnsupportedOperationException.class, 
bd::arrayNext);
+    Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testArrayMaxCustom(boolean useDirect) throws IOException {
+    try {
+      System.setProperty(SystemLimitException.MAX_COLLECTION_LENGTH_PROPERTY, 
Long.toString(128));
+      resetLimits();
+      Exception ex = 
Assertions.assertThrows(UnsupportedOperationException.class,
+          () -> newDecoder(useDirect, MAX_ARRAY_VM_LIMIT + 
1).readArrayStart());
+      Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+      // Two OK reads followed by going over the custom limit.
+      Decoder bd = newDecoder(useDirect, 118, 10, 1);
+      Assertions.assertEquals(118, bd.readArrayStart());
+      Assertions.assertEquals(10, bd.arrayNext());
+      ex = Assertions.assertThrows(SystemLimitException.class, bd::arrayNext);
+      Assertions.assertEquals("Collection length 129 exceeds maximum allowed", 
ex.getMessage());
+
+      // Two OK reads followed by going over the VM limit, where negative 
numbers are
+      // followed by the byte length of the items. For testing, the 999 values 
are
+      // read but ignored.
+      bd = newDecoder(useDirect, -118, 999, -10, 999, 1);
+      Assertions.assertEquals(118, bd.readArrayStart());
+      Assertions.assertEquals(10, bd.arrayNext());
+      ex = Assertions.assertThrows(SystemLimitException.class, bd::arrayNext);
+      Assertions.assertEquals("Collection length 129 exceeds maximum allowed", 
ex.getMessage());
 
-    Assert.assertThrows("Cannot read arrays longer than " + 
BinaryDecoder.MAX_ARRAY_SIZE + " bytes",
-        UnsupportedOperationException.class, () -> bd.readBytes(null));
+    } finally {
+      
System.clearProperty(SystemLimitException.MAX_COLLECTION_LENGTH_PROPERTY);
+      resetLimits();
+    }
   }
 
-  @Test
-  public void testBytesMaxLengthProperty() {
-    int maxLength = 128;
-    byte[] bad = new byte[10];
-    BinaryData.encodeLong(maxLength + 1, bad, 0);
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testMapVmMaxSize(boolean useDirect) throws IOException {
+    // At start
+    Exception ex = Assertions.assertThrows(UnsupportedOperationException.class,
+        () -> this.newDecoder(useDirect, MAX_ARRAY_VM_LIMIT + 
1).readMapStart());
+    Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    // Next
+    ex = Assertions.assertThrows(UnsupportedOperationException.class,
+        () -> this.newDecoder(useDirect, MAX_ARRAY_VM_LIMIT + 1).mapNext());
+    Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    // Two OK reads followed by going over the VM limit.
+    Decoder bd = newDecoder(useDirect, MAX_ARRAY_VM_LIMIT - 100, 100, 1);
+    Assertions.assertEquals(MAX_ARRAY_VM_LIMIT - 100, bd.readMapStart());
+    Assertions.assertEquals(100, bd.mapNext());
+    ex = Assertions.assertThrows(UnsupportedOperationException.class, 
bd::mapNext);
+    Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+    // Two OK reads followed by going over the VM limit, where negative 
numbers are
+    // followed by the byte length of the items. For testing, the 999 values 
are
+    // read but ignored.
+    bd = newDecoder(useDirect, 100 - MAX_ARRAY_VM_LIMIT, 999, -100, 999, 1);
+    Assertions.assertEquals(MAX_ARRAY_VM_LIMIT - 100, bd.readMapStart());
+    Assertions.assertEquals(100, bd.mapNext());
+    ex = Assertions.assertThrows(UnsupportedOperationException.class, 
bd::mapNext);
+    Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  public void testMapMaxCustom(boolean useDirect) throws IOException {
     try {
-      System.setProperty("org.apache.avro.limits.bytes.maxLength", 
Long.toString(maxLength));
-      Decoder bd = this.newDecoder(bad);
+      System.setProperty(SystemLimitException.MAX_COLLECTION_LENGTH_PROPERTY, 
Long.toString(128));
+      resetLimits();
+      Exception ex = 
Assertions.assertThrows(UnsupportedOperationException.class,
+          () -> newDecoder(useDirect, MAX_ARRAY_VM_LIMIT + 1).readMapStart());
+      Assertions.assertEquals(ERROR_VM_LIMIT_COLLECTION, ex.getMessage());
+
+      // Two OK reads followed by going over the custom limit.
+      Decoder bd = newDecoder(useDirect, 118, 10, 1);
+      Assertions.assertEquals(118, bd.readMapStart());
+      Assertions.assertEquals(10, bd.mapNext());
+      ex = Assertions.assertThrows(SystemLimitException.class, bd::mapNext);
+      Assertions.assertEquals("Collection length 129 exceeds maximum allowed", 
ex.getMessage());
+
+      // Two OK reads followed by going over the VM limit, where negative 
numbers are
+      // followed by the byte length of the items. For testing, the 999 values 
are
+      // read but ignored.
+      bd = newDecoder(useDirect, -118, 999, -10, 999, 1);
+      Assertions.assertEquals(118, bd.readMapStart());
+      Assertions.assertEquals(10, bd.mapNext());
+      ex = Assertions.assertThrows(SystemLimitException.class, bd::mapNext);
+      Assertions.assertEquals("Collection length 129 exceeds maximum allowed", 
ex.getMessage());
 
-      Assert.assertThrows("Bytes length " + (maxLength + 1) + " exceeds 
maximum allowed", AvroRuntimeException.class,
-          () -> bd.readBytes(null));
     } finally {
-      System.clearProperty("org.apache.avro.limits.bytes.maxLength");
+      
System.clearProperty(SystemLimitException.MAX_COLLECTION_LENGTH_PROPERTY);
+      resetLimits();
     }
   }
 
-  @Test(expected = UnsupportedOperationException.class)
-  public void testLongLengthEncoding() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void longLengthEncoding(boolean useDirect) {
     // Size equivalent to Integer.MAX_VALUE + 1
     byte[] bad = new byte[] { (byte) -128, (byte) -128, (byte) -128, (byte) 
-128, (byte) 16 };
-    Decoder bd = this.newDecoder(bad);
-    bd.readString();
+    Decoder bd = this.newDecoder(bad, useDirect);
+    Assertions.assertThrows(UnsupportedOperationException.class, 
bd::readString);
   }
 
-  @Test(expected = EOFException.class)
-  public void testIntTooShort() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void intTooShort(boolean useDirect) {
     byte[] badint = new byte[4];
     Arrays.fill(badint, (byte) 0xff);
-    newDecoder(badint).readInt();
+    Assertions.assertThrows(EOFException.class, () -> newDecoder(badint, 
useDirect).readInt());
   }
 
-  @Test(expected = EOFException.class)
-  public void testLongTooShort() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void longTooShort(boolean useDirect) {
     byte[] badint = new byte[9];
     Arrays.fill(badint, (byte) 0xff);
-    newDecoder(badint).readLong();
+    Assertions.assertThrows(EOFException.class, () -> newDecoder(badint, 
useDirect).readLong());
   }
 
-  @Test(expected = EOFException.class)
-  public void testFloatTooShort() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void floatTooShort(boolean useDirect) {
     byte[] badint = new byte[3];
     Arrays.fill(badint, (byte) 0xff);
-    newDecoder(badint).readInt();
+    Assertions.assertThrows(EOFException.class, () -> newDecoder(badint, 
useDirect).readInt());
   }
 
-  @Test(expected = EOFException.class)
-  public void testDoubleTooShort() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void doubleTooShort(boolean useDirect) {
     byte[] badint = new byte[7];
     Arrays.fill(badint, (byte) 0xff);
-    newDecoder(badint).readLong();
+    Assertions.assertThrows(EOFException.class, () -> newDecoder(badint, 
useDirect).readLong());
   }
 
-  @Test
-  public void testSkipping() throws IOException {
-    Decoder d = newDecoder(data);
-    skipGenerated(d);
-    if (d instanceof BinaryDecoder) {
-      BinaryDecoder bd = (BinaryDecoder) d;
-      try {
-        Assert.assertTrue(bd.isEnd());
-      } catch (UnsupportedOperationException e) {
-        // this is ok if its a DirectBinaryDecoder.
-        if (bd.getClass() != DirectBinaryDecoder.class) {
-          throw e;
-        }
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void skipping(boolean useDirect) throws IOException {
+    BinaryDecoder bd = newDecoder(data, useDirect);
+    skipGenerated(bd);
+
+    try {
+      Assertions.assertTrue(bd.isEnd());
+    } catch (UnsupportedOperationException e) {
+      // this is ok if its a DirectBinaryDecoder.
+      if (bd.getClass() != DirectBinaryDecoder.class) {
+        throw e;
       }
-      bd = this.newDecoder(new ByteArrayInputStream(data), bd);
-      skipGenerated(bd);
-      try {
-        Assert.assertTrue(bd.isEnd());
-      } catch (UnsupportedOperationException e) {
-        // this is ok if its a DirectBinaryDecoder.
-        if (bd.getClass() != DirectBinaryDecoder.class) {
-          throw e;
-        }
+    }
+    bd = this.newDecoder(new ByteArrayInputStream(data), bd, useDirect);
+    skipGenerated(bd);
+    try {
+      Assertions.assertTrue(bd.isEnd());
+    } catch (UnsupportedOperationException e) {
+      // this is ok if its a DirectBinaryDecoder.
+      if (bd.getClass() != DirectBinaryDecoder.class) {
+        throw e;
       }
     }
+
   }
 
   private void skipGenerated(Decoder bd) throws IOException {
@@ -496,19 +666,20 @@ public class TestBinaryDecoder {
     } catch (EOFException e) {
       eof = e;
     }
-    Assert.assertNotNull(eof);
+    Assertions.assertNotNull(eof);
   }
 
-  @Test(expected = EOFException.class)
-  public void testEOF() throws IOException {
+  @ParameterizedTest
+  @ValueSource(booleans = { true, false })
+  void eof(boolean useDirect) throws IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     Encoder e = EncoderFactory.get().binaryEncoder(baos, null);
     e.writeLong(0x10000000000000L);
     e.flush();
 
-    Decoder d = newDecoder(new ByteArrayInputStream(baos.toByteArray()));
-    Assert.assertEquals(0x10000000000000L, d.readLong());
-    d.readInt();
+    Decoder d = newDecoder(new ByteArrayInputStream(baos.toByteArray()), 
useDirect);
+    Assertions.assertEquals(0x10000000000000L, d.readLong());
+    Assertions.assertThrows(EOFException.class, () -> d.readInt());
   }
 
 }
diff --git a/lang/java/avro/src/test/java/org/apache/avro/util/TestUtf8.java 
b/lang/java/avro/src/test/java/org/apache/avro/util/TestUtf8.java
index 918465a72..9c5ca5450 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/util/TestUtf8.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/util/TestUtf8.java
@@ -19,9 +19,7 @@ package org.apache.avro.util;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.*;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
@@ -30,7 +28,9 @@ import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.charset.StandardCharsets;
 
-import org.junit.Test;
+import org.apache.avro.SystemLimitException;
+import org.apache.avro.TestSystemLimitException;
+import org.junit.jupiter.api.Test;
 
 public class TestUtf8 {
   @Test
@@ -98,6 +98,26 @@ public class TestUtf8 {
     assertEquals(3198781, u.hashCode());
   }
 
+  @Test
+  void oversizeUtf8() {
+    Utf8 u = new Utf8();
+    u.setByteLength(1024);
+    assertEquals(1024, u.getByteLength());
+    assertThrows(UnsupportedOperationException.class,
+        () -> u.setByteLength(TestSystemLimitException.MAX_ARRAY_VM_LIMIT + 
1));
+
+    try {
+      System.setProperty(SystemLimitException.MAX_STRING_LENGTH_PROPERTY, 
Long.toString(1000L));
+      TestSystemLimitException.resetLimits();
+
+      Exception ex = assertThrows(SystemLimitException.class, () -> 
u.setByteLength(1024));
+      assertEquals("String length 1024 exceeds maximum allowed", 
ex.getMessage());
+    } finally {
+      System.clearProperty(SystemLimitException.MAX_STRING_LENGTH_PROPERTY);
+      TestSystemLimitException.resetLimits();
+    }
+  }
+
   @Test
   public void testSerialization() throws IOException, ClassNotFoundException {
     try (ByteArrayOutputStream bos = new ByteArrayOutputStream();

Reply via email to