This is an automated email from the ASF dual-hosted git repository.

fokko pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro.git


The following commit(s) were added to refs/heads/main by this push:
     new 876eae32d AVRO-3666 [Java] Use the new schema parser (#2642)
876eae32d is described below

commit 876eae32dbcf353313e85e926a7c0e11170985a2
Author: Oscar Westra van Holthe - Kind <[email protected]>
AuthorDate: Thu Apr 4 10:59:47 2024 +0200

    AVRO-3666 [Java] Use the new schema parser (#2642)
    
    * AVRO-3666: Redo schema parsing code
    
    This undoes the split schema parsing to allow forward references, which
    is to be handles by the SchemaParser & ParseContext classes. It uses the
    new ParseContext for the classic schema parser to accommodate this.
    
    Next step: use the new SchemaParser and resolve unresolved / forward
    references after parsing. This will also resolve "forward" references
    that were parsed in subsequent files.
    
    * AVRO-3666: Resolve references after parsing
    
    By resolving references after parsing, we allow both forward references
    within a file as between subsequent files.
    
    This change also includes using the new SchemaParser everywhere, as
    using it is the best way to flush out bugs.
    
    * AVRO-3666: Remove wrong test
    
    * AVRO-1535: Fix aliases as well
    
    * AVRO-3666: Re-enable disabled test
    
    Also includes changes necessary to debug.
    
    * AVRO-3666: Fix RAT exclusion
    
    The wrong exclusion was removed.
    
    * AVRO-3666: Remove unused field
    
    * AVRO-3666: Introduce SchemaParser.ParseResult
    
    This ensures the SchemaParser never returns unresolved schemata.
    
    * AVRO-3666: Use SchemaParser for documentation
    
    * AVRO-3666: Refactor after review
    
    * AVRO-3666: Fix javadoc
    
    * AVRO-3666: Fix merge bug
    
    * AVRO-3666: Fix CodeQL warnings
    
    * AVRO-3666: Increase test coverage
    
    * AVRO-3666: Fix tests
    
    * AVRO-3666: Refactor schema parsing for readability
    
    The JSON schema parser is quite complex (it is a large method). This
    change splits it in multiple methods, naming the various stages.
    
    * AVRO-3666: rename method to avoid confusion
    
    * AVRO-3666: Reduce PR size
    
    This change reduces the PR size, but does require some extra work after
    merging: the new SchemaParser class is hardly used, and the (now)
    obsolete Schema.Parser class is used heavily.
    
    * AVRO-3666: Reduce PR size more
    
    * AVRO-3666: Reduce PR size again
    
    * AVRO-3666: Spotless
    
    * Update lang/java/avro/src/main/java/org/apache/avro/Schema.java
    
    Co-authored-by: Fokko Driesprong <[email protected]>
    
    * AVRO-3666: Spotless
    
    ---------
    
    Co-authored-by: Fokko Driesprong <[email protected]>
---
 .../++version++/Getting started (Java)/_index.md   |   2 +-
 .../org/apache/avro/FormattedSchemaParser.java     |   8 +-
 .../java/org/apache/avro/JsonSchemaParser.java     |  28 +-
 .../main/java/org/apache/avro/ParseContext.java    | 198 ++++--
 .../src/main/java/org/apache/avro/Protocol.java    | 129 ++--
 .../avro/src/main/java/org/apache/avro/Schema.java | 759 +++++++++------------
 .../main/java/org/apache/avro/SchemaParser.java    |  69 +-
 .../java/org/apache/avro/util/SchemaResolver.java  | 166 +----
 .../java/org/apache/avro/DummySchemaParser.java    |   2 +-
 .../java/org/apache/avro/ParseContextTest.java     |  19 +-
 .../src/test/java/org/apache/avro/TestSchema.java  |  30 +-
 .../java/org/apache/avro/TestSchemaParser.java     |  32 +-
 .../apache/avro/compiler/idl/SchemaResolver.java   |  14 +-
 .../avro/compiler/specific/SpecificCompiler.java   |  13 +-
 .../javacc/org/apache/avro/compiler/idl/idl.jj     |   2 +
 .../src/main/java/org/apache/avro/idl/IdlFile.java |  68 +-
 .../main/java/org/apache/avro/idl/IdlReader.java   |  35 +-
 .../src/test/idl/input/schema_syntax_schema.avdl   |   2 +-
 .../java/idl/src/test/idl/input/status_schema.avdl |   2 +
 .../test/java/org/apache/avro/idl/TestCycle.java   |   2 +-
 .../main/java/org/apache/avro/mojo/IDLMojo.java    |   1 -
 .../main/java/org/apache/avro/mojo/SchemaMojo.java |  11 +-
 .../main/java/org/apache/avro/tool/IdlTool.java    |   1 -
 23 files changed, 809 insertions(+), 784 deletions(-)

diff --git a/doc/content/en/docs/++version++/Getting started (Java)/_index.md 
b/doc/content/en/docs/++version++/Getting started (Java)/_index.md
index 2d964c9c1..429e98376 100644
--- a/doc/content/en/docs/++version++/Getting started (Java)/_index.md  
+++ b/doc/content/en/docs/++version++/Getting started (Java)/_index.md  
@@ -212,7 +212,7 @@ Let's go over the same example as in the previous section, 
but without using cod
 First, we use a SchemaParser to read our schema definition and create a Schema 
object.
 
 ```java
-Schema schema = new SchemaParser().parse(new File("user.avsc"));
+Schema schema = new SchemaParser().parse(new File("user.avsc")).mainSchema();
 ```
 
 Using this schema, let's create some users.
diff --git 
a/lang/java/avro/src/main/java/org/apache/avro/FormattedSchemaParser.java 
b/lang/java/avro/src/main/java/org/apache/avro/FormattedSchemaParser.java
index cd67788fa..c37eca15d 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/FormattedSchemaParser.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/FormattedSchemaParser.java
@@ -50,7 +50,7 @@ public interface FormattedSchemaParser {
    * when expecting JSON), it is a good idea not to do anything (especially
    * calling methods on the @code ParseContext}).</li>
    * <li>The parameter {@code parseContext} is not thread-safe.</li>
-   * <li>When parsing, all parsed schema definitions should be added to the
+   * <li>All named schema definitions that are parsed should be added to the
    * provided {@link ParseContext}.</li>
    * <li>Optionally, you may return a "main" schema. Some schema definitions 
have
    * one, for example the schema defined by the root of the JSON document in a
@@ -62,9 +62,9 @@ public interface FormattedSchemaParser {
    * the parsing process, so reserve that for rethrowing exceptions.</li>
    * </ul>
    *
-   * @param parseContext    the current parse context: all parsed schemata 
should
-   *                        be added here to resolve names with; contains all
-   *                        previously known types
+   * @param parseContext    the current parse context: all named schemata that 
are
+   *                        parsed should be added here, otherwise resolving
+   *                        schemata can fail; contains all previously known 
types
    * @param baseUri         the base location of the schema, or {@code null} if
    *                        not known
    * @param formattedSchema the text of the schema definition(s) to parse
diff --git a/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java 
b/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java
index c7d918786..5dd532444 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/JsonSchemaParser.java
@@ -57,26 +57,32 @@ public class JsonSchemaParser implements 
FormattedSchemaParser {
     for (String fragment : fragments) {
       buffer.append(fragment);
     }
-    return new JsonSchemaParser().parse(new 
ParseContext(NameValidator.NO_VALIDATION), buffer, null);
+
+    boolean saved = Schema.getValidateDefaults();
+    try {
+      Schema.setValidateDefaults(false);
+      ParseContext context = new ParseContext(NameValidator.NO_VALIDATION);
+      Schema schema = new JsonSchemaParser().parse(context, buffer, true);
+      context.commit();
+      context.resolveAllSchemas();
+      return context.resolve(schema);
+    } finally {
+      Schema.setValidateDefaults(saved);
+    }
   }
 
   @Override
   public Schema parse(ParseContext parseContext, URI baseUri, CharSequence 
formattedSchema)
       throws IOException, SchemaParseException {
-    return parse(parseContext, formattedSchema, parseContext.nameValidator);
+    return parse(parseContext, formattedSchema, false);
   }
 
-  private Schema parse(ParseContext parseContext, CharSequence 
formattedSchema, NameValidator nameValidator)
+  private Schema parse(ParseContext parseContext, CharSequence 
formattedSchema, boolean allowInvalidDefaults)
       throws SchemaParseException {
-    Schema.Parser parser = new Schema.Parser(nameValidator);
-    if (nameValidator == NameValidator.NO_VALIDATION) {
+    Schema.Parser parser = new Schema.Parser(parseContext);
+    if (allowInvalidDefaults) {
       parser.setValidateDefaults(false);
-    } else {
-      parser = new Schema.Parser(nameValidator);
     }
-    parser.addTypes(parseContext.typesByName().values());
-    Schema schema = parser.parse(formattedSchema.toString());
-    parser.getTypes().values().forEach(parseContext::put);
-    return schema;
+    return parser.parseInternal(formattedSchema.toString());
   }
 }
diff --git a/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java 
b/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java
index 401c93e50..b7bc42b97 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/ParseContext.java
@@ -18,25 +18,36 @@
 package org.apache.avro;
 
 import org.apache.avro.util.SchemaResolver;
+import org.apache.avro.util.Schemas;
 
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.requireNonNull;
 
 /**
  * Class to define a name context, useful to reference schemata with. This
  * allows for the following:
  *
  * <ul>
- * <li>Provide a default namespace for nested contexts, as found for example in
- * JSON based schema definitions.</li>
- * <li>Find schemata by name, including primitives.</li>
  * <li>Collect new named schemata.</li>
+ * <li>Find schemata by name, including primitives.</li>
+ * <li>Find schemas that do not exist yet.</li>
+ * <li>Resolve references to schemas that didn't exist yet when first 
used.</li>
  * </ul>
  *
  * <p>
+ * This class is NOT thread-safe.
+ * </p>
+ *
+ * <p>
  * Note: this class has no use for most Avro users, but is a key component when
  * implementing a schema parser.
  * </p>
@@ -60,10 +71,27 @@ public class ParseContext {
 
   private static final Set<Schema.Type> NAMED_SCHEMA_TYPES = 
EnumSet.of(Schema.Type.RECORD, Schema.Type.ENUM,
       Schema.Type.FIXED);
+  /**
+   * Collection of old schemata. Can contain unresolved references if 
!isResolved.
+   */
   private final Map<String, Schema> oldSchemas;
+  /**
+   * Collection of new schemata. Can contain unresolved references.
+   */
   private final Map<String, Schema> newSchemas;
+  /**
+   * The name validator to use.
+   */
   // Visible for use in JsonSchemaParser
   final NameValidator nameValidator;
+  /**
+   * Visitor that was used to resolve schemata with. If not available, some
+   * schemata in {@code oldSchemas} may not be fully resolved. If available, 
all
+   * schemata in {@code oldSchemas} are resolved, and {@code newSchemas} is 
empty.
+   * After visiting a schema, it can return the corresponding resolved schema 
for
+   * a schema that possibly contains unresolved references.
+   */
+  private SchemaResolver.ResolvingVisitor resolvingVisitor;
 
   /**
    * Create a {@code ParseContext} for the default/{@code null} namespace, 
using
@@ -78,22 +106,14 @@ public class ParseContext {
    * schemata.
    */
   public ParseContext(NameValidator nameValidator) {
-    this(nameValidator, new LinkedHashMap<>(), new LinkedHashMap<>());
+    this(requireNonNull(nameValidator), new LinkedHashMap<>(), new 
LinkedHashMap<>());
   }
 
   private ParseContext(NameValidator nameValidator, Map<String, Schema> 
oldSchemas, Map<String, Schema> newSchemas) {
     this.nameValidator = nameValidator;
     this.oldSchemas = oldSchemas;
     this.newSchemas = newSchemas;
-  }
-
-  /**
-   * Create a derived context using a different fallback namespace.
-   *
-   * @return a new context
-   */
-  public ParseContext namespace() {
-    return new ParseContext(nameValidator, oldSchemas, newSchemas);
+    resolvingVisitor = null;
   }
 
   /**
@@ -109,45 +129,64 @@ public class ParseContext {
 
   /**
    * <p>
-   * Resolve a schema by name.
+   * Find a schema by name and namespace.
    * </p>
    *
    * <p>
    * That is:
    * </p>
    *
-   * <ul>
-   * <li>If {@code fullName} is a primitive name, return a (new) schema for
-   * it</li>
-   * <li>Otherwise: resolve the schema in its own namespace and in the null
-   * namespace (the former takes precedence)</li>
-   * </ul>
+   * <ol>
+   * <li>If {@code name} is a primitive name, return a (new) schema for it</li>
+   * <li>Otherwise, determine the full schema name (using the given
+   * {@code namespace} if necessary), and find it</li>
+   * <li>If no schema was found and {@code name} is a simple name, find the 
schema
+   * in the default (null) namespace</li>
+   * <li>If still no schema was found, return an unresolved reference for the 
full
+   * schema name (see step 2)</li>
+   * </ol>
    *
-   * Resolving means that the schema is returned if known, and otherwise an
-   * unresolved schema (a reference) is returned.
+   * <p>
+   * Note: as an unresolved reference might be returned, the schema is not
+   * directly usable. Please {@link #put(Schema)} the schema using it in the
+   * context. The {@link SchemaParser} and protocol parsers will ensure you'll
+   * only get a resolved schema that is usable.
+   * </p>
    *
-   * @param fullName the full schema name to resolve
-   * @return the schema
-   * @throws SchemaParseException when the schema does not exist
+   * @param name      the schema name to find
+   * @param namespace the namespace to find the schema against
+   * @return the schema, or an unresolved reference
    */
-  public Schema resolve(String fullName) {
-    Schema.Type type = PRIMITIVES.get(fullName);
+  public Schema find(String name, String namespace) {
+    Schema.Type type = PRIMITIVES.get(name);
     if (type != null) {
       return Schema.create(type);
     }
 
-    Schema schema = getSchema(fullName);
+    String fullName = fullName(name, namespace);
+    Schema schema = getNamedSchema(fullName);
     if (schema == null) {
-      // Not found; attempt to resolve in the default namespace
-      int lastDot = fullName.lastIndexOf('.');
-      String name = fullName.substring(lastDot + 1);
-      schema = getSchema(name);
+      schema = getNamedSchema(name);
     }
 
     return schema != null ? schema : SchemaResolver.unresolvedSchema(fullName);
   }
 
-  private Schema getSchema(String fullName) {
+  private String fullName(String name, String namespace) {
+    if (namespace != null && name.lastIndexOf('.') < 0) {
+      return namespace + "." + name;
+    }
+    return name;
+  }
+
+  /**
+   * Get a schema by name. Note that the schema might not (yet) be 
resolved/usable
+   * until {@link #resolveAllSchemas()} has been called.
+   *
+   * @param fullName a full schema name
+   * @return the schema, if known
+   */
+  public Schema getNamedSchema(String fullName) {
     Schema schema = oldSchemas.get(fullName);
     if (schema == null) {
       schema = newSchemas.get(fullName);
@@ -155,10 +194,6 @@ public class ParseContext {
     return schema;
   }
 
-  private boolean notEmpty(String str) {
-    return str != null && !str.isEmpty();
-  }
-
   /**
    * Put the schema into this context. This is an idempotent operation: it only
    * fails if this context already has a different schema with the same name.
@@ -184,6 +219,7 @@ public class ParseContext {
         throw new SchemaParseException("Can't redefine: " + fullName);
       }
     } else {
+      resolvingVisitor = null;
       Schema previouslyAddedSchema = newSchemas.putIfAbsent(fullName, schema);
       if (previouslyAddedSchema != null && 
!previouslyAddedSchema.equals(schema)) {
         throw new SchemaParseException("Can't redefine: " + fullName);
@@ -200,10 +236,10 @@ public class ParseContext {
     return fullName;
   }
 
-  private void validateName(String name, String what) {
+  private void validateName(String name, String typeOfName) {
     NameValidator.Result result = nameValidator.validate(name);
     if (!result.isOK()) {
-      throw new SchemaParseException(what + " \"" + name + "\" is invalid: " + 
result.getErrors());
+      throw new SchemaParseException(typeOfName + " \"" + name + "\" is 
invalid: " + result.getErrors());
     }
   }
 
@@ -216,12 +252,94 @@ public class ParseContext {
     newSchemas.clear();
   }
 
+  public SchemaParser.ParseResult commit(Schema mainSchema) {
+    Collection<Schema> parsedNamedSchemas = newSchemas.values();
+    SchemaParser.ParseResult parseResult = new SchemaParser.ParseResult() {
+      @Override
+      public Schema mainSchema() {
+        return mainSchema == null ? null : resolve(mainSchema);
+      }
+
+      @Override
+      public List<Schema> parsedNamedSchemas() {
+        return 
parsedNamedSchemas.stream().map(ParseContext.this::resolve).collect(Collectors.toList());
+      }
+    };
+    commit();
+    return parseResult;
+  }
+
   public void rollback() {
     newSchemas.clear();
   }
 
   /**
-   * Return all known types by their fullname.
+   * Resolve all (named) schemas that were parsed. This resolves all forward
+   * references, even if parsed from different files. Note: the context must be
+   * committed for this method to work.
+   *
+   * @return all parsed schemas, in the order they were parsed
+   * @throws AvroTypeException if a schema reference cannot be resolved
+   */
+  public List<Schema> resolveAllSchemas() {
+    ensureSchemasAreResolved();
+
+    return new ArrayList<>(oldSchemas.values());
+  }
+
+  private void ensureSchemasAreResolved() {
+    if (hasNewSchemas()) {
+      throw new IllegalStateException("Schemas cannot be resolved unless the 
ParseContext is committed.");
+    }
+    if (resolvingVisitor == null) {
+      NameValidator saved = Schema.getNameValidator();
+      try {
+        // Ensure we use the same validation when copying schemas as when they 
were
+        // defined.
+        Schema.setNameValidator(nameValidator);
+        SchemaResolver.ResolvingVisitor visitor = new 
SchemaResolver.ResolvingVisitor(oldSchemas::get);
+        oldSchemas.values().forEach(schema -> Schemas.visit(schema, visitor));
+        // Before this point is where we can get exceptions due to resolving 
failures.
+        for (Map.Entry<String, Schema> entry : oldSchemas.entrySet()) {
+          entry.setValue(visitor.getResolved(entry.getValue()));
+        }
+        resolvingVisitor = visitor;
+      } finally {
+        Schema.setNameValidator(saved);
+      }
+    }
+  }
+
+  /**
+   * Resolve unresolved references in a schema <em>that was parsed for this
+   * context</em> using the types known to this context. Note: this method will
+   * ensure all known schemas are resolved, or throw, and thus requires the
+   * context to be committed.
+   *
+   * @param schema the schema resolve
+   * @return the fully resolved schema
+   * @throws AvroTypeException if a schema reference cannot be resolved
+   */
+  public Schema resolve(Schema schema) {
+    ensureSchemasAreResolved();
+
+    // As all (named) schemas are resolved now, we know:
+    // — All named types are either in oldSchemas or unknown.
+    // — All unnamed types can be visited&resolved without validation.
+
+    if (NAMED_SCHEMA_TYPES.contains(schema.getType()) && schema.getFullName() 
!= null) {
+      return requireNonNull(oldSchemas.get(schema.getFullName()), () -> 
"Unknown schema: " + schema.getFullName());
+    } else {
+      // Unnamed or anonymous schema
+      // (protocol message request parameters are anonymous records)
+      Schemas.visit(schema, resolvingVisitor); // This field is set, as 
ensureSchemasAreResolved(); was called.
+      return resolvingVisitor.getResolved(schema);
+    }
+  }
+
+  /**
+   * Return all known types by their fullname. Warning: this returns all types,
+   * even uncommitted ones, including unresolved references!
    *
    * @return a map of all types by their name
    */
diff --git a/lang/java/avro/src/main/java/org/apache/avro/Protocol.java 
b/lang/java/avro/src/main/java/org/apache/avro/Protocol.java
index e837e6008..905f2778c 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/Protocol.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/Protocol.java
@@ -17,12 +17,6 @@
  */
 package org.apache.avro;
 
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.avro.Schema.Field;
-import org.apache.avro.Schema.Field.Order;
-
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.IOException;
@@ -42,6 +36,12 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Field.Order;
+
 /**
  * A set of messages forming an application protocol.
  * <p>
@@ -79,9 +79,9 @@ public class Protocol extends JsonProperties {
 
   /** A protocol message. */
   public class Message extends JsonProperties {
-    private String name;
-    private String doc;
-    private Schema request;
+    private final String name;
+    private final String doc;
+    private final Schema request;
 
     /** Construct a message. */
     private Message(String name, String doc, JsonProperties propMap, Schema 
request) {
@@ -132,7 +132,7 @@ public class Protocol extends JsonProperties {
       try {
         StringWriter writer = new StringWriter();
         JsonGenerator gen = Schema.FACTORY.createGenerator(writer);
-        toJson(gen);
+        toJson(new HashSet<>(), gen);
         gen.flush();
         return writer.toString();
       } catch (IOException e) {
@@ -140,19 +140,19 @@ public class Protocol extends JsonProperties {
       }
     }
 
-    void toJson(JsonGenerator gen) throws IOException {
+    void toJson(Set<String> knownNames, JsonGenerator gen) throws IOException {
       gen.writeStartObject();
       if (doc != null)
         gen.writeStringField("doc", doc);
       writeProps(gen); // write out properties
       gen.writeFieldName("request");
-      request.fieldsToJson(types, gen);
+      request.fieldsToJson(knownNames, namespace, gen);
 
-      toJson1(gen);
+      toJson1(knownNames, gen);
       gen.writeEndObject();
     }
 
-    void toJson1(JsonGenerator gen) throws IOException {
+    void toJson1(Set<String> knownNames, JsonGenerator gen) throws IOException 
{
       gen.writeStringField("response", "null");
       gen.writeBooleanField("one-way", true);
     }
@@ -177,9 +177,9 @@ public class Protocol extends JsonProperties {
     }
   }
 
-  private class TwoWayMessage extends Message {
-    private Schema response;
-    private Schema errors;
+  private final class TwoWayMessage extends Message {
+    private final Schema response;
+    private final Schema errors;
 
     /** Construct a message. */
     private TwoWayMessage(String name, String doc, Map<String, ?> propMap, 
Schema request, Schema response,
@@ -227,15 +227,15 @@ public class Protocol extends JsonProperties {
     }
 
     @Override
-    void toJson1(JsonGenerator gen) throws IOException {
+    void toJson1(Set<String> knownNames, JsonGenerator gen) throws IOException 
{
       gen.writeFieldName("response");
-      response.toJson(types, gen);
+      response.toJson(knownNames, namespace, gen);
 
       List<Schema> errs = errors.getTypes(); // elide system error
       if (errs.size() > 1) {
         Schema union = Schema.createUnion(errs.subList(1, errs.size()));
         gen.writeFieldName("errors");
-        union.toJson(types, gen);
+        union.toJson(knownNames, namespace, gen);
       }
     }
 
@@ -245,7 +245,7 @@ public class Protocol extends JsonProperties {
   private String namespace;
   private String doc;
 
-  private Schema.Names types = new Schema.Names();
+  private ParseContext context = new ParseContext();
   private final Map<String, Message> messages = new LinkedHashMap<>();
   private byte[] md5;
 
@@ -267,6 +267,7 @@ public class Protocol extends JsonProperties {
    * {@code doc}, and {@code namespace} as {code p} has. It also copies all the
    * {@code props}.
    */
+  @SuppressWarnings("CopyConstructorMissesField")
   public Protocol(Protocol p) {
     this(p.getName(), p.getDoc(), p.getNamespace());
     putAll(p);
@@ -294,7 +295,6 @@ public class Protocol extends JsonProperties {
     if (this.namespace != null && this.namespace.isEmpty()) {
       this.namespace = null;
     }
-    types.space(this.namespace);
   }
 
   /** The name of this protocol. */
@@ -314,19 +314,30 @@ public class Protocol extends JsonProperties {
 
   /** The types of this protocol. */
   public Collection<Schema> getTypes() {
-    return types.values();
+    return context.resolveAllSchemas();
+  }
+
+  /** @deprecated can return invalid schemata: do NOT use! */
+  @Deprecated
+  public Collection<Schema> getUnresolvedTypes() {
+    return context.typesByName().values();
   }
 
   /** Returns the named type. */
   public Schema getType(String name) {
-    return types.get(name);
+    Schema namedSchema = null;
+    if (!name.contains(".")) {
+      namedSchema = context.getNamedSchema(namespace + "." + name);
+    }
+    return namedSchema != null ? namedSchema : context.getNamedSchema(name);
   }
 
   /** Set the types of this protocol. */
   public void setTypes(Collection<Schema> newTypes) {
-    types = new Schema.Names();
+    context = new ParseContext();
     for (Schema s : newTypes)
-      types.add(s);
+      context.put(s);
+    context.commit();
   }
 
   /** The messages of this protocol. */
@@ -349,12 +360,12 @@ public class Protocol extends JsonProperties {
   }
 
   /** Create a one-way message. */
-  public <T> Message createMessage(String name, String doc, JsonProperties 
propMap, Schema request) {
+  public Message createMessage(String name, String doc, JsonProperties 
propMap, Schema request) {
     return new Message(name, doc, propMap, request);
   }
 
   /** Create a one-way message. */
-  public <T> Message createMessage(String name, String doc, Map<String, ?> 
propMap, Schema request) {
+  public Message createMessage(String name, String doc, Map<String, ?> 
propMap, Schema request) {
     return new Message(name, doc, propMap, request);
   }
 
@@ -373,13 +384,13 @@ public class Protocol extends JsonProperties {
   }
 
   /** Create a two-way message. */
-  public <T> Message createMessage(String name, String doc, JsonProperties 
propMap, Schema request, Schema response,
+  public Message createMessage(String name, String doc, JsonProperties 
propMap, Schema request, Schema response,
       Schema errors) {
     return new TwoWayMessage(name, doc, propMap, request, response, errors);
   }
 
   /** Create a two-way message. */
-  public <T> Message createMessage(String name, String doc, Map<String, ?> 
propMap, Schema request, Schema response,
+  public Message createMessage(String name, String doc, Map<String, ?> 
propMap, Schema request, Schema response,
       Schema errors) {
     return new TwoWayMessage(name, doc, propMap, request, response, errors);
   }
@@ -392,13 +403,13 @@ public class Protocol extends JsonProperties {
       return false;
     Protocol that = (Protocol) o;
     return Objects.equals(this.name, that.name) && 
Objects.equals(this.namespace, that.namespace)
-        && Objects.equals(this.types, that.types) && 
Objects.equals(this.messages, that.messages)
-        && this.propsEqual(that);
+        && Objects.equals(this.context.resolveAllSchemas(), 
that.context.resolveAllSchemas())
+        && Objects.equals(this.messages, that.messages) && 
this.propsEqual(that);
   }
 
   @Override
   public int hashCode() {
-    return 31 * Objects.hash(name, namespace, types, messages) + 
propsHashCode();
+    return 31 * Objects.hash(name, namespace, context, messages) + 
propsHashCode();
   }
 
   /** Render this as <a href="https://json.org/";>JSON</a>. */
@@ -427,8 +438,6 @@ public class Protocol extends JsonProperties {
   }
 
   void toJson(JsonGenerator gen) throws IOException {
-    types.space(namespace);
-
     gen.writeStartObject();
     gen.writeStringField("protocol", name);
     if (namespace != null) {
@@ -439,16 +448,16 @@ public class Protocol extends JsonProperties {
       gen.writeStringField("doc", doc);
     writeProps(gen);
     gen.writeArrayFieldStart("types");
-    Schema.Names resolved = new Schema.Names(namespace);
-    for (Schema type : types.values())
-      if (!resolved.contains(type))
-        type.toJson(resolved, gen);
+    Set<String> knownNames = new HashSet<>();
+    for (Schema type : context.resolveAllSchemas())
+      if (!knownNames.contains(type.getFullName()))
+        type.toJson(knownNames, namespace, gen);
     gen.writeEndArray();
 
     gen.writeObjectFieldStart("messages");
     for (Map.Entry<String, Message> e : messages.entrySet()) {
       gen.writeFieldName(e.getKey());
-      e.getValue().toJson(gen);
+      e.getValue().toJson(knownNames, gen);
     }
     gen.writeEndObject();
     gen.writeEndObject();
@@ -510,6 +519,27 @@ public class Protocol extends JsonProperties {
     parseMessages(json);
     parseDoc(json);
     parseProps(json);
+
+    context.commit();
+    context.resolveAllSchemas();
+    resolveMessageSchemata();
+  }
+
+  private void resolveMessageSchemata() {
+    for (Map.Entry<String, Message> entry : messages.entrySet()) {
+      Message oldValue = entry.getValue();
+      Message newValue;
+      if (oldValue.isOneWay()) {
+        newValue = createMessage(oldValue.getName(), oldValue.getDoc(), 
oldValue,
+            context.resolve(oldValue.getRequest()));
+      } else {
+        Schema request = context.resolve(oldValue.getRequest());
+        Schema response = context.resolve(oldValue.getResponse());
+        Schema errors = context.resolve(oldValue.getErrors());
+        newValue = createMessage(oldValue.getName(), oldValue.getDoc(), 
oldValue, request, response, errors);
+      }
+      entry.setValue(newValue);
+    }
   }
 
   private void parseNameAndNamespace(JsonNode json) {
@@ -544,11 +574,7 @@ public class Protocol extends JsonProperties {
     for (JsonNode type : defs) {
       if (!type.isObject())
         throw new SchemaParseException("Type not an object: " + type);
-      Schema.parseNamesDeclared(type, types, types.space());
-
-    }
-    for (JsonNode type : defs) {
-      Schema.parseCompleteSchema(type, types, types.space());
+      Schema.parse(type, context, namespace);
     }
   }
 
@@ -596,8 +622,8 @@ public class Protocol extends JsonProperties {
       JsonNode fieldDocNode = field.get("doc");
       if (fieldDocNode != null)
         fieldDoc = fieldDocNode.textValue();
-      Field newField = new Field(name, Schema.parse(fieldTypeNode, types), 
fieldDoc, field.get("default"), true,
-          Order.ASCENDING);
+      Field newField = new Field(name, Schema.parse(fieldTypeNode, context, 
namespace), fieldDoc, field.get("default"),
+          true, Order.ASCENDING);
       Set<String> aliases = Schema.parseAliases(field);
       if (aliases != null) { // add aliases
         for (String alias : aliases)
@@ -612,7 +638,7 @@ public class Protocol extends JsonProperties {
       }
       fields.add(newField);
     }
-    Schema request = Schema.createRecord(fields);
+    Schema request = Schema.createRecord(null, null, null, false, fields);
 
     boolean oneWay = false;
     JsonNode oneWayNode = json.get("one-way");
@@ -631,12 +657,12 @@ public class Protocol extends JsonProperties {
     if (oneWay) {
       if (decls != null)
         throw new SchemaParseException("one-way can't have errors: " + json);
-      if (responseNode != null && Schema.parse(responseNode, types).getType() 
!= Schema.Type.NULL)
+      if (responseNode != null && Schema.parse(responseNode, context, 
namespace).getType() != Schema.Type.NULL)
         throw new SchemaParseException("One way response must be null: " + 
json);
       return new Message(messageName, doc, mProps, request);
     }
 
-    Schema response = Schema.parse(responseNode, types);
+    Schema response = Schema.parse(responseNode, context, namespace);
 
     List<Schema> errs = new ArrayList<>();
     errs.add(SYSTEM_ERROR); // every method can throw
@@ -645,7 +671,7 @@ public class Protocol extends JsonProperties {
         throw new SchemaParseException("Errors not an array: " + json);
       for (JsonNode decl : decls) {
         String name = decl.textValue();
-        Schema schema = this.types.get(name);
+        Schema schema = this.context.find(name, namespace);
         if (schema == null)
           throw new SchemaParseException("Undefined error: " + name);
         if (!schema.isError())
@@ -660,5 +686,4 @@ public class Protocol extends JsonProperties {
   public static void main(String[] args) throws Exception {
     System.out.println(Protocol.parse(new File(args[0])));
   }
-
 }
diff --git a/lang/java/avro/src/main/java/org/apache/avro/Schema.java 
b/lang/java/avro/src/main/java/org/apache/avro/Schema.java
index 5e62bd110..e38e31d40 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/Schema.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/Schema.java
@@ -46,7 +46,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.IdentityHashMap;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.List;
@@ -55,7 +54,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
-import java.util.stream.StreamSupport;
 
 import static org.apache.avro.LogicalType.LOGICAL_TYPE_PROP;
 
@@ -80,9 +78,9 @@ import static org.apache.avro.LogicalType.LOGICAL_TYPE_PROP;
  * <li><i>null</i>.
  * </ul>
  *
- * A schema can be constructed using one of its static <tt>createXXX</tt>
- * methods, or more conveniently using {@link SchemaBuilder}. The schema 
objects
- * are <i>logically</i> immutable. There are only two mutating methods -
+ * Construct a schema using one of its static <tt>createXXX</tt> methods, or
+ * more conveniently using {@link SchemaBuilder}. The schema objects are
+ * <i>logically</i> immutable. There are only two mutating methods -
  * {@link #setFields(List)} and {@link #addProp(String, String)}. The following
  * restrictions apply on these two methods.
  * <ul>
@@ -93,6 +91,7 @@ import static org.apache.avro.LogicalType.LOGICAL_TYPE_PROP;
  * property.
  * </ul>
  */
+@SuppressWarnings("unused")
 public abstract class Schema extends JsonProperties implements Serializable {
 
   private static final long serialVersionUID = 1L;
@@ -125,20 +124,20 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     FACTORY.setCodec(MAPPER);
   }
 
-  /** The type of a schema. */
+  /** The type of schema. */
   public enum Type {
     RECORD, ENUM, ARRAY, MAP, UNION, FIXED, STRING, BYTES, INT, LONG, FLOAT, 
DOUBLE, BOOLEAN, NULL;
 
     private final String name;
 
-    private Type() {
+    Type() {
       this.name = this.name().toLowerCase(Locale.ENGLISH);
     }
 
     public String getName() {
       return name;
     }
-  };
+  }
 
   private final Type type;
   private LogicalType logicalType = null;
@@ -206,9 +205,9 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
    * Create an anonymous record schema.
    *
    * @deprecated This method allows to create Schema objects that cannot be 
parsed
-   *             by {@link Schema.Parser#parse(String)}. It will be removed in 
a
-   *             future version of Avro. Better use
-   *             i{@link #createRecord(String, String, String, boolean, List)} 
to
+   *             by {@link SchemaParser#parse(CharSequence)}. It will be 
removed
+   *             in a future version of Avro. Better use
+   *             {@link #createRecord(String, String, String, boolean, List)} 
to
    *             produce a fully qualified Schema.
    */
   @Deprecated
@@ -273,7 +272,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
    * <tt>fieldName</tt>. If there is no field by that name, a <tt>null</tt> is
    * returned.
    */
-  public Field getField(String fieldname) {
+  public Field getField(String fieldName) {
     throw new AvroRuntimeException("Not a record: " + this);
   }
 
@@ -406,7 +405,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
    * @param pretty if true, pretty-print JSON.
    */
   public String toString(boolean pretty) {
-    return toString(new Names(), pretty);
+    return toString(new HashSet<String>(), pretty);
   }
 
   /**
@@ -419,22 +418,22 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
   // Use at your own risk. This method should be removed with AVRO-2832.
   @Deprecated
   public String toString(Collection<Schema> referencedSchemas, boolean pretty) 
{
-    Schema.Names names = new Schema.Names();
+    Set<String> knownNames = new HashSet<>();
     if (referencedSchemas != null) {
       for (Schema s : referencedSchemas) {
-        names.add(s);
+        knownNames.add(s.getFullName());
       }
     }
-    return toString(names, pretty);
+    return toString(knownNames, pretty);
   }
 
-  String toString(Names names, boolean pretty) {
+  String toString(Set<String> knownNames, boolean pretty) {
     try {
       StringWriter writer = new StringWriter();
       JsonGenerator gen = FACTORY.createGenerator(writer);
       if (pretty)
         gen.useDefaultPrettyPrinter();
-      toJson(names, gen);
+      toJson(knownNames, null, gen);
       gen.flush();
       return writer.toString();
     } catch (IOException e) {
@@ -442,7 +441,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
   }
 
-  void toJson(Names names, JsonGenerator gen) throws IOException {
+  void toJson(Set<String> knownNames, String namespace, JsonGenerator gen) 
throws IOException {
     if (!hasProps()) { // no props defined
       gen.writeString(getName()); // just write name
     } else {
@@ -453,7 +452,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
   }
 
-  void fieldsToJson(Names names, JsonGenerator gen) throws IOException {
+  void fieldsToJson(Set<String> knownNames, String namespace, JsonGenerator 
gen) throws IOException {
     throw new AvroRuntimeException("Not a record: " + this);
   }
 
@@ -487,12 +486,12 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
   private static final Set<String> FIELD_RESERVED = Collections
       .unmodifiableSet(new HashSet<>(Arrays.asList("default", "doc", "name", 
"order", "type", "aliases")));
 
-  /** Returns true if this record is an union type. */
+  /** Returns true if this record is a union type. */
   public boolean isUnion() {
     return this instanceof UnionSchema;
   }
 
-  /** Returns true if this record is an union type containing null. */
+  /** Returns true if this record is a union type containing null. */
   public boolean isNullable() {
     if (!isUnion()) {
       return getType().equals(Schema.Type.NULL);
@@ -581,14 +580,14 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
      *
      */
     public Field(String name, Schema schema) {
-      this(name, schema, (String) null, (JsonNode) null, true, 
Order.ASCENDING);
+      this(name, schema, null, null, true, Order.ASCENDING);
     }
 
     /**
      *
      */
     public Field(String name, Schema schema, String doc) {
-      this(name, schema, doc, (JsonNode) null, true, Order.ASCENDING);
+      this(name, schema, doc, null, true, Order.ASCENDING);
     }
 
     /**
@@ -613,7 +612,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
 
     public String name() {
       return name;
-    };
+    }
 
     /** The position of this field within the record. */
     public int pos() {
@@ -698,95 +697,6 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     public String toString() {
       return name + " type:" + schema.type + " pos:" + position;
     }
-
-    /**
-     * Parse field.
-     *
-     * @param field     : json field definition.
-     * @param names     : names map.
-     * @param namespace : current working namespace.
-     * @return field.
-     */
-    static Field parse(JsonNode field, Names names, String namespace) {
-      String fieldName = getRequiredText(field, "name", "No field name");
-      String fieldDoc = getOptionalText(field, "doc");
-      JsonNode fieldTypeNode = field.get("type");
-      if (fieldTypeNode == null) {
-        throw new SchemaParseException("No field type: " + field);
-      }
-
-      Schema fieldSchema = null;
-      if (fieldTypeNode.isTextual()) {
-        Schema schemaField = names.get(fieldTypeNode.textValue());
-        if (schemaField == null) {
-          schemaField = names.get(namespace + "." + fieldTypeNode.textValue());
-        }
-        if (schemaField == null) {
-          throw new SchemaParseException(fieldTypeNode + " is not a defined 
name." + " The type of the \"" + fieldName
-              + "\" field must be a defined name or a {\"type\": ...} 
expression.");
-        }
-        fieldSchema = schemaField;
-      } else if (fieldTypeNode.isObject()) {
-        fieldSchema = resolveSchema(fieldTypeNode, names, namespace);
-        if (fieldSchema == null) {
-          fieldSchema = Schema.parseCompleteSchema(fieldTypeNode, names, 
namespace);
-        }
-      } else if (fieldTypeNode.isArray()) {
-        List<Schema> unionTypes = new ArrayList<>();
-
-        fieldTypeNode.forEach((JsonNode node) -> {
-          Schema subSchema = null;
-          if (node.isTextual()) {
-            subSchema = names.get(node.asText());
-            if (subSchema == null) {
-              subSchema = names.get(namespace + "." + node.asText());
-            }
-          } else if (node.isObject()) {
-            subSchema = Schema.parseCompleteSchema(node, names, namespace);
-          } else {
-            throw new SchemaParseException("Illegal type in union : " + node);
-          }
-          if (subSchema == null) {
-            throw new SchemaParseException("Null element in union : " + node);
-          }
-          unionTypes.add(subSchema);
-        });
-
-        fieldSchema = Schema.createUnion(unionTypes);
-      }
-
-      if (fieldSchema == null) {
-        throw new SchemaParseException("Can't find type for field " + 
fieldName);
-      }
-      Field.Order order = Field.Order.ASCENDING;
-      JsonNode orderNode = field.get("order");
-      if (orderNode != null)
-        order = 
Field.Order.valueOf(orderNode.textValue().toUpperCase(Locale.ENGLISH));
-      JsonNode defaultValue = field.get("default");
-
-      if (defaultValue != null
-          && (Type.FLOAT.equals(fieldSchema.getType()) || 
Type.DOUBLE.equals(fieldSchema.getType()))
-          && defaultValue.isTextual()) {
-        try {
-          defaultValue = new 
DoubleNode(Double.valueOf(defaultValue.textValue()));
-        } catch (NumberFormatException ex) {
-          throw new SchemaParseException(
-              "Can't parse number '" + defaultValue.textValue() + "' for field 
'" + fieldName);
-        }
-      }
-
-      Field f = new Field(fieldName, fieldSchema, fieldDoc, defaultValue, 
true, order);
-      Iterator<String> i = field.fieldNames();
-      while (i.hasNext()) { // add field props
-        String prop = i.next();
-        if (!FIELD_RESERVED.contains(prop))
-          f.addProp(prop, field.get(prop));
-      }
-      f.aliases = parseAliases(field);
-
-      return f;
-    }
-
   }
 
   static class Name {
@@ -832,13 +742,13 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
       return full;
     }
 
-    public void writeName(Names names, JsonGenerator gen) throws IOException {
+    public void writeName(String currentNamespace, JsonGenerator gen) throws 
IOException {
       if (name != null)
         gen.writeStringField("name", name);
       if (space != null) {
-        if (!space.equals(names.space()))
+        if (!space.equals(currentNamespace))
           gen.writeStringField("namespace", space);
-      } else if (names.space() != null) { // null within non-null
+      } else if (currentNamespace != null) { // null within non-null
         gen.writeStringField("namespace", "");
       }
     }
@@ -849,8 +759,8 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
 
     /**
      * Determine if full name must be written. There are 2 cases for true :
-     * defaultSpace != from this.space or name is already a Schema.Type (int, 
array
-     * ...)
+     * {@code defaultSpace} != from {@code this.space} or name is already a
+     * {@code Schema.Type} (int, array, ...)
      *
      * @param defaultSpace : default name space.
      * @return true if full name must be written.
@@ -925,22 +835,25 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
       Set<String> result = new LinkedHashSet<>();
       if (aliases != null)
         for (Name alias : aliases)
-          result.add(alias.full);
+          if (alias.space == null && name.space != null)
+            result.add("." + alias.name);
+          else
+            result.add(alias.full);
       return result;
     }
 
-    public boolean writeNameRef(Names names, JsonGenerator gen) throws 
IOException {
-      if (this.equals(names.get(name))) {
-        gen.writeString(name.getQualified(names.space()));
-        return true;
-      } else if (name.name != null) {
-        names.put(name, this);
+    public boolean writeNameRef(Set<String> knownNames, String 
currentNamespace, JsonGenerator gen) throws IOException {
+      if (name.name != null) {
+        if (!knownNames.add(name.full)) {
+          gen.writeString(name.getQualified(currentNamespace));
+          return true;
+        }
       }
       return false;
     }
 
-    public void writeName(Names names, JsonGenerator gen) throws IOException {
-      name.writeName(names, gen);
+    public void writeName(String currentNamespace, JsonGenerator gen) throws 
IOException {
+      name.writeName(currentNamespace, gen);
     }
 
     public boolean equalNames(NamedSchema that) {
@@ -969,8 +882,8 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
    * and need to watch for recursion.
    */
   public static class SeenPair {
-    private Object s1;
-    private Object s2;
+    private final Object s1;
+    private final Object s2;
 
     public SeenPair(Object s1, Object s2) {
       this.s1 = s1;
@@ -992,7 +905,6 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
   private static final ThreadLocal<Set<SeenPair>> SEEN_EQUALS = 
ThreadLocalWithInitial.of(HashSet::new);
   private static final ThreadLocal<Map<Schema, Schema>> SEEN_HASHCODE = 
ThreadLocalWithInitial.of(IdentityHashMap::new);
 
-  @SuppressWarnings(value = "unchecked")
   private static class RecordSchema extends NamedSchema {
     private List<Field> fields;
     private Map<String, Field> fieldMap;
@@ -1015,10 +927,10 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
 
     @Override
-    public Field getField(String fieldname) {
+    public Field getField(String fieldName) {
       if (fieldMap == null)
         throw new AvroRuntimeException("Schema fields not set yet");
-      return fieldMap.get(fieldname);
+      return fieldMap.get(fieldName);
     }
 
     @Override
@@ -1070,7 +982,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
         return false;
       if (!propsEqual(that))
         return false;
-      Set seen = SEEN_EQUALS.get();
+      Set<Schema.SeenPair> seen = SEEN_EQUALS.get();
       SeenPair here = new SeenPair(this, o);
       if (seen.contains(here))
         return true; // prevent stack overflow
@@ -1100,36 +1012,34 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
 
     @Override
-    void toJson(Names names, JsonGenerator gen) throws IOException {
-      if (writeNameRef(names, gen))
+    void toJson(Set<String> knownNames, String currentNamespace, JsonGenerator 
gen) throws IOException {
+      if (writeNameRef(knownNames, currentNamespace, gen))
         return;
-      String savedSpace = names.space; // save namespace
       gen.writeStartObject();
       gen.writeStringField("type", isError ? "error" : "record");
-      writeName(names, gen);
-      names.space = name.space; // set default namespace
-      if (this.getDoc() != null)
+      writeName(currentNamespace, gen);
+      if (this.getDoc() != null) {
         gen.writeStringField("doc", this.getDoc());
+      }
 
       if (fields != null) {
         gen.writeFieldName("fields");
-        fieldsToJson(names, gen);
+        fieldsToJson(knownNames, name.space, gen);
       }
 
       writeProps(gen);
       aliasesToJson(gen);
       gen.writeEndObject();
-      names.space = savedSpace; // restore namespace
     }
 
     @Override
-    void fieldsToJson(Names names, JsonGenerator gen) throws IOException {
+    void fieldsToJson(Set<String> knownNames, String namespace, JsonGenerator 
gen) throws IOException {
       gen.writeStartArray();
       for (Field f : fields) {
         gen.writeStartObject();
         gen.writeStringField("name", f.name());
         gen.writeFieldName("type");
-        f.schema().toJson(names, gen);
+        f.schema().toJson(knownNames, namespace, gen);
         if (f.doc() != null)
           gen.writeStringField("doc", f.doc());
         if (f.hasDefaultValue()) {
@@ -1138,7 +1048,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
         }
         if (f.order() != Field.Order.ASCENDING)
           gen.writeStringField("order", f.order().name);
-        if (f.aliases != null && f.aliases.size() != 0) {
+        if (f.aliases != null && !f.aliases.isEmpty()) {
           gen.writeFieldName("aliases");
           gen.writeStartArray();
           for (String alias : f.aliases)
@@ -1210,12 +1120,12 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
 
     @Override
-    void toJson(Names names, JsonGenerator gen) throws IOException {
-      if (writeNameRef(names, gen))
+    void toJson(Set<String> knownNames, String currentNamespace, JsonGenerator 
gen) throws IOException {
+      if (writeNameRef(knownNames, currentNamespace, gen))
         return;
       gen.writeStartObject();
       gen.writeStringField("type", "enum");
-      writeName(names, gen);
+      writeName(currentNamespace, gen);
       if (getDoc() != null)
         gen.writeStringField("doc", getDoc());
       gen.writeArrayFieldStart("symbols");
@@ -1259,11 +1169,11 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
 
     @Override
-    void toJson(Names names, JsonGenerator gen) throws IOException {
+    void toJson(Set<String> knownNames, String namespace, JsonGenerator gen) 
throws IOException {
       gen.writeStartObject();
       gen.writeStringField("type", "array");
       gen.writeFieldName("items");
-      elementType.toJson(names, gen);
+      elementType.toJson(knownNames, namespace, gen);
       writeProps(gen);
       gen.writeEndObject();
     }
@@ -1298,11 +1208,11 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
 
     @Override
-    void toJson(Names names, JsonGenerator gen) throws IOException {
+    void toJson(Set<String> knownNames, String currentNamespace, JsonGenerator 
gen) throws IOException {
       gen.writeStartObject();
       gen.writeStringField("type", "map");
       gen.writeFieldName("values");
-      valueType.toJson(names, gen);
+      valueType.toJson(knownNames, currentNamespace, gen);
       writeProps(gen);
       gen.writeEndObject();
     }
@@ -1375,10 +1285,10 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
 
     @Override
-    void toJson(Names names, JsonGenerator gen) throws IOException {
+    void toJson(Set<String> knownNames, String currentNamespace, JsonGenerator 
gen) throws IOException {
       gen.writeStartArray();
       for (Schema type : types)
-        type.toJson(names, gen);
+        type.toJson(knownNames, currentNamespace, gen);
       gen.writeEndArray();
     }
 
@@ -1419,12 +1329,12 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
 
     @Override
-    void toJson(Names names, JsonGenerator gen) throws IOException {
-      if (writeNameRef(names, gen))
+    void toJson(Set<String> knownNames, String currentNamespace, JsonGenerator 
gen) throws IOException {
+      if (writeNameRef(knownNames, currentNamespace, gen))
         return;
       gen.writeStartObject();
       gen.writeStringField("type", "fixed");
-      writeName(names, gen);
+      writeName(currentNamespace, gen);
       if (getDoc() != null)
         gen.writeStringField("doc", getDoc());
       gen.writeNumberField("size", size);
@@ -1488,7 +1398,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
    * may refer to it by name.
    */
   public static class Parser {
-    private Names names = new Names();
+    final ParseContext context;
     private final NameValidator validate;
     private boolean validateDefaults = true;
 
@@ -1498,11 +1408,19 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
 
     public Parser(final NameValidator validate) {
       this.validate = validate != null ? validate : 
NameValidator.NO_VALIDATION;
+      context = new ParseContext(this.validate);
+    }
+
+    public Parser(final ParseContext context) {
+      this.validate = context.nameValidator;
+      this.context = context;
     }
 
     /**
      * Adds the provided types to the set of defined, named types known to this
-     * parser. deprecated: use addTypes(Iterable<Schema> types)
+     * parser.
+     *
+     * @deprecated use addTypes(Iterable<Schema> types)
      */
     @Deprecated
     public Parser addTypes(Map<String, Schema> types) {
@@ -1515,16 +1433,13 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
      */
     public Parser addTypes(Iterable<Schema> types) {
       for (Schema s : types)
-        names.add(s);
+        context.put(s);
       return this;
     }
 
     /** Returns the set of defined, named types known to this parser. */
     public Map<String, Schema> getTypes() {
-      Map<String, Schema> result = new LinkedHashMap<>();
-      for (Schema s : names.values())
-        result.put(s.getFullName(), s);
-      return result;
+      return context.typesByName();
     }
 
     /** Enable or disable default value validation. */
@@ -1543,21 +1458,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
      * names known to this parser.
      */
     public Schema parse(File file) throws IOException {
-      return parse(FACTORY.createParser(file), false);
-    }
-
-    public List<Schema> parse(Iterable<File> sources) throws IOException {
-      final List<Schema> schemas = new ArrayList<>();
-      for (File source : sources) {
-        final Schema emptySchema = 
parseNamesDeclared(FACTORY.createParser(source));
-        schemas.add(emptySchema);
-      }
-
-      for (File source : sources) {
-        parseFieldsOnly(FACTORY.createParser(source));
-      }
-
-      return schemas;
+      return parse(FACTORY.createParser(file), false, true);
     }
 
     /**
@@ -1565,7 +1466,8 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
      * names known to this parser. The input stream stays open after the 
parsing.
      */
     public Schema parse(InputStream in) throws IOException {
-      return 
parse(FACTORY.createParser(in).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE), 
true);
+      JsonParser parser = 
FACTORY.createParser(in).disable(JsonParser.Feature.AUTO_CLOSE_SOURCE);
+      return parse(parser, true, true);
     }
 
     /** Read a schema from one or more json strings */
@@ -1582,43 +1484,41 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
      */
     public Schema parse(String s) {
       try {
-        return parse(FACTORY.createParser(s), false);
+        return parse(FACTORY.createParser(s), false, true);
       } catch (IOException e) {
         throw new SchemaParseException(e);
       }
     }
 
-    private static interface ParseFunction {
-      Schema parse(JsonNode node) throws IOException;
+    public Schema parseInternal(String s) {
+      try {
+        return parse(FACTORY.createParser(s), false, false);
+      } catch (IOException e) {
+        throw new SchemaParseException(e);
+      }
     }
 
-    private Schema runParser(JsonParser parser, ParseFunction f) throws 
IOException {
-      NameValidator saved = validateNames.get();
+    private Schema parse(JsonParser parser, boolean allowDanglingContent, 
boolean resolveSchema) throws IOException {
+      NameValidator saved = VALIDATE_NAMES.get();
       boolean savedValidateDefaults = VALIDATE_DEFAULTS.get();
       try {
-        validateNames.set(validate);
+        // This ensured we're using the same validation as the ParseContext.
+        // This is most relevant for field names.
+        VALIDATE_NAMES.set(validate);
         VALIDATE_DEFAULTS.set(validateDefaults);
         JsonNode jsonNode = MAPPER.readTree(parser);
-        return f.parse(jsonNode);
-      } catch (JsonParseException e) {
-        throw new SchemaParseException(e);
-      } finally {
-        parser.close();
-        validateNames.set(saved);
-        VALIDATE_DEFAULTS.set(savedValidateDefaults);
-      }
-    }
-
-    private Schema parse(JsonParser parser, final boolean 
allowDanglingContent) throws IOException {
-      return this.runParser(parser, (JsonNode jsonNode) -> {
-        Schema schema = Schema.parse(jsonNode, names);
+        Schema schema = Schema.parse(jsonNode, context, null);
+        if (resolveSchema) {
+          context.commit();
+          schema = context.resolve(schema);
+        }
         if (!allowDanglingContent) {
           String dangling;
           StringWriter danglingWriter = new StringWriter();
           int numCharsReleased = parser.releaseBuffered(danglingWriter);
           if (numCharsReleased == -1) {
             ByteArrayOutputStream danglingOutputStream = new 
ByteArrayOutputStream();
-            parser.releaseBuffered(danglingOutputStream); // if input isnt 
chars above it must be bytes
+            parser.releaseBuffered(danglingOutputStream); // if input isn't 
chars above it must be bytes
             dangling = new String(danglingOutputStream.toByteArray(), 
StandardCharsets.UTF_8).trim();
           } else {
             dangling = danglingWriter.toString().trim();
@@ -1628,17 +1528,14 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
           }
         }
         return schema;
-      });
-    }
-
-    private Schema parseNamesDeclared(JsonParser parser) throws IOException {
-      return this.runParser(parser, (JsonNode jsonNode) -> 
Schema.parseNamesDeclared(jsonNode, names, names.space));
-    }
-
-    private Schema parseFieldsOnly(JsonParser parser) throws IOException {
-      return this.runParser(parser, (JsonNode jsonNode) -> 
Schema.parseCompleteSchema(jsonNode, names, names.space));
+      } catch (JsonParseException e) {
+        throw new SchemaParseException(e);
+      } finally {
+        parser.close();
+        VALIDATE_NAMES.set(saved);
+        VALIDATE_DEFAULTS.set(savedValidateDefaults);
+      }
     }
-
   }
 
   /**
@@ -1647,9 +1544,9 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
    *
    * @param file The file to read the schema from.
    * @return The freshly built Schema.
-   * @throws IOException if there was trouble reading the contents or they are
+   * @throws IOException if there was trouble reading the contents, or they are
    *                     invalid
-   * @deprecated use {@link Schema.Parser} instead.
+   * @deprecated use {@link SchemaParser} instead.
    */
   @Deprecated
   public static Schema parse(File file) throws IOException {
@@ -1662,9 +1559,9 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
    *
    * @param in The input stream to read the schema from.
    * @return The freshly built Schema.
-   * @throws IOException if there was trouble reading the contents or they are
+   * @throws IOException if there was trouble reading the contents, or they are
    *                     invalid
-   * @deprecated use {@link Schema.Parser} instead.
+   * @deprecated use {@link SchemaParser} instead.
    */
   @Deprecated
   public static Schema parse(InputStream in) throws IOException {
@@ -1674,7 +1571,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
   /**
    * Construct a schema from <a href="https://json.org/";>JSON</a> text.
    *
-   * @deprecated use {@link Schema.Parser} instead.
+   * @deprecated use {@link SchemaParser} instead.
    */
   @Deprecated
   public static Schema parse(String jsonSchema) {
@@ -1685,7 +1582,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
    * Construct a schema from <a href="https://json.org/";>JSON</a> text.
    *
    * @param validate true if names should be validated, false if not.
-   * @deprecated use {@link Schema.Parser} instead.
+   * @deprecated use {@link SchemaParser} instead.
    */
   @Deprecated
   public static Schema parse(String jsonSchema, boolean validate) {
@@ -1759,19 +1656,31 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
   }
 
-  private static ThreadLocal<NameValidator> validateNames = 
ThreadLocalWithInitial
+  private static final ThreadLocal<NameValidator> VALIDATE_NAMES = 
ThreadLocalWithInitial
       .of(() -> NameValidator.UTF_VALIDATOR);
 
   private static String validateName(String name) {
-    NameValidator.Result result = validateNames.get().validate(name);
+    NameValidator.Result result = VALIDATE_NAMES.get().validate(name);
     if (!result.isOK()) {
       throw new SchemaParseException(result.getErrors());
     }
     return name;
   }
 
+  /*
+   * @deprecated Scheduled for removal. Do Not Use!
+   */
+  @Deprecated
   public static void setNameValidator(final NameValidator validator) {
-    Schema.validateNames.set(validator);
+    Schema.VALIDATE_NAMES.set(validator);
+  }
+
+  /*
+   * @deprecated Scheduled for removal. Do Not Use!
+   */
+  @Deprecated
+  public static NameValidator getNameValidator() {
+    return Schema.VALIDATE_NAMES.get();
   }
 
   private static final ThreadLocal<Boolean> VALIDATE_DEFAULTS = 
ThreadLocalWithInitial.of(() -> true);
@@ -1784,6 +1693,22 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     return defaultValue;
   }
 
+  /*
+   * @deprecated Scheduled for removal. Do Not Use!
+   */
+  @Deprecated
+  public static void setValidateDefaults(boolean validateDefaults) {
+    Schema.VALIDATE_DEFAULTS.set(validateDefaults);
+  }
+
+  /*
+   * @deprecated Scheduled for removal. Do Not Use!
+   */
+  @Deprecated
+  public static boolean getValidateDefaults() {
+    return Schema.VALIDATE_DEFAULTS.get();
+  }
+
   /**
    * Checks if a JSON value matches the schema.
    *
@@ -1867,267 +1792,187 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     }
   }
 
-  /**
-   * Parse named schema in order to fill names map. This method does not parse
-   * field of record/error schema.
-   *
-   * @param schema           : json schema representation.
-   * @param names            : map of named schema.
-   * @param currentNameSpace : current working name space.
-   * @return schema.
-   */
-  static Schema parseNamesDeclared(JsonNode schema, Names names, String 
currentNameSpace) {
+  /** @see #parse(String) */
+  static Schema parse(JsonNode schema, ParseContext context, String 
currentNameSpace) {
     if (schema == null) {
-      return null;
-    }
-    if (schema.isObject()) {
-
-      String type = Schema.getOptionalText(schema, "type");
-      Name name = null;
-
-      String doc = null;
-      Schema result = null;
+      throw new SchemaParseException("Cannot parse <null> schema");
+    } else if (schema.isTextual()) { // name
+      return context.find(schema.textValue(), currentNameSpace);
+    } else if (schema.isObject()) {
+      String type = getRequiredText(schema, "type", "No type");
       final boolean isTypeError = "error".equals(type);
-      final boolean isTypeRecord = "record".equals(type);
-      final boolean isTypeEnum = "enum".equals(type);
-      final boolean isTypeFixed = "fixed".equals(type);
-
-      if (isTypeRecord || isTypeError || isTypeEnum || isTypeFixed) {
-        String space = getOptionalText(schema, "namespace");
-        doc = getOptionalText(schema, "doc");
-        if (space == null)
-          space = currentNameSpace;
-        name = new Name(getRequiredText(schema, "name", "No name in schema"), 
space);
-      }
-      if (isTypeRecord || isTypeError) { // record
-        result = new RecordSchema(name, doc, isTypeError);
-        names.add(result);
-        JsonNode fieldsNode = schema.get("fields");
-
-        if (fieldsNode == null || !fieldsNode.isArray())
-          throw new SchemaParseException("Record has no fields: " + schema);
-        exploreFields(fieldsNode, names, name != null ? name.space : null);
-
-      } else if (isTypeEnum) { // enum
-        JsonNode symbolsNode = schema.get("symbols");
-        if (symbolsNode == null || !symbolsNode.isArray())
-          throw new SchemaParseException("Enum has no symbols: " + schema);
-        LockableArrayList<String> symbols = new 
LockableArrayList<>(symbolsNode.size());
-        for (JsonNode n : symbolsNode)
-          symbols.add(n.textValue());
-        JsonNode enumDefault = schema.get("default");
-        String defaultSymbol = null;
-        if (enumDefault != null)
-          defaultSymbol = enumDefault.textValue();
-        result = new EnumSchema(name, doc, symbols, defaultSymbol);
-        names.add(result);
+      if (PRIMITIVES.containsKey(type)) { // primitive
+        return parsePrimitive(schema, type);
+      } else if ("record".equals(type) || isTypeError) { // record
+        return parseRecord(schema, context, currentNameSpace, isTypeError);
+      } else if ("enum".equals(type)) { // enum
+        return parseEnum(schema, context, currentNameSpace);
       } else if (type.equals("array")) { // array
-        JsonNode itemsNode = schema.get("items");
-        if (itemsNode == null)
-          throw new SchemaParseException("Array has no items type: " + schema);
-        final Schema items = Schema.parseNamesDeclared(itemsNode, names, 
currentNameSpace);
-        result = Schema.createArray(items);
+        return parseArray(schema, context, currentNameSpace);
       } else if (type.equals("map")) { // map
-        JsonNode valuesNode = schema.get("values");
-        if (valuesNode == null)
-          throw new SchemaParseException("Map has no values type: " + schema);
-        final Schema values = Schema.parseNamesDeclared(valuesNode, names, 
currentNameSpace);
-        result = Schema.createMap(values);
-      } else if (isTypeFixed) { // fixed
-        JsonNode sizeNode = schema.get("size");
-        if (sizeNode == null || !sizeNode.isInt())
-          throw new SchemaParseException("Invalid or no size: " + schema);
-        result = new FixedSchema(name, doc, sizeNode.intValue());
-        if (name != null)
-          names.add(result);
-      } else if (PRIMITIVES.containsKey(type)) {
-        result = Schema.create(PRIMITIVES.get(type));
-      }
-      if (result != null) {
-        Set<String> reserved = SCHEMA_RESERVED;
-        if (isTypeEnum) {
-          reserved = ENUM_RESERVED;
-        }
-        Schema.addProperties(schema, reserved, result);
+        return parseMap(schema, context, currentNameSpace);
+      } else if ("fixed".equals(type)) { // fixed
+        return parseFixed(schema, context, currentNameSpace);
+      } else { // For unions with self reference
+        return context.find(type, currentNameSpace);
       }
-      return result;
-    } else if (schema.isArray()) {
-      List<Schema> subs = new ArrayList<>(schema.size());
-      schema.forEach((JsonNode item) -> {
-        Schema sub = Schema.parseNamesDeclared(item, names, currentNameSpace);
-        if (sub != null) {
-          subs.add(sub);
-        }
-      });
-      return Schema.createUnion(subs);
-    } else if (schema.isTextual()) {
-      String value = schema.asText();
-      return names.get(value);
+    } else if (schema.isArray()) { // union
+      return parseUnion(schema, context, currentNameSpace);
+    } else {
+      throw new SchemaParseException("Schema not yet supported: " + schema);
     }
-    return null;
   }
 
-  private static void addProperties(JsonNode schema, Set<String> reserved, 
Schema avroSchema) {
-    Iterator<String> i = schema.fieldNames();
-    while (i.hasNext()) { // add properties
-      String prop = i.next();
-      if (!reserved.contains(prop)) // ignore reserved
-        avroSchema.addProp(prop, schema.get(prop));
-    }
-    // parse logical type if present
-    avroSchema.logicalType = LogicalTypes.fromSchemaIgnoreInvalid(avroSchema);
-    // names.space(savedSpace); // restore space
-    if (avroSchema instanceof NamedSchema) {
-      Set<String> aliases = parseAliases(schema);
-      if (aliases != null) // add aliases
-        for (String alias : aliases)
-          avroSchema.addAlias(alias);
-    }
+  private static Schema parsePrimitive(JsonNode schema, String type) {
+    Schema result = create(PRIMITIVES.get(type));
+    parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED);
+    return result;
   }
 
-  /**
-   * Explore record fields in order to fill names map with inner defined named
-   * types.
-   *
-   * @param fieldsNode : json node for field.
-   * @param names      : names map.
-   * @param nameSpace  : current working namespace.
-   */
-  private static void exploreFields(JsonNode fieldsNode, Names names, String 
nameSpace) {
+  private static Schema parseRecord(JsonNode schema, ParseContext context, 
String currentNameSpace,
+      boolean isTypeError) {
+    Name name = parseName(schema, currentNameSpace);
+    String doc = parseDoc(schema);
+    Schema result = new RecordSchema(name, doc, isTypeError);
+    context.put(result);
+
+    JsonNode fieldsNode = schema.get("fields");
+    if (fieldsNode == null || !fieldsNode.isArray())
+      throw new SchemaParseException("Record has no fields: " + schema);
+    List<Field> fields = new ArrayList<>();
     for (JsonNode field : fieldsNode) {
-      final JsonNode fieldType = field.get("type");
-      if (fieldType != null) {
-        if (fieldType.isObject()) {
-          parseNamesDeclared(fieldType, names, nameSpace);
-        } else if (fieldType.isArray()) {
-          exploreFields(fieldType, names, nameSpace);
-        } else if (fieldType.isTextual() && field.isObject()) {
-          parseNamesDeclared(field, names, nameSpace);
-        }
-      }
+      Field f = parseField(field, context, name.space);
+      fields.add(f);
+      if (f.schema().getLogicalType() == null && getOptionalText(field, 
LOGICAL_TYPE_PROP) != null)
+        LOG.warn(
+            "Ignored the {}.{}.logicalType property (\"{}\"). It should 
probably be nested inside the \"type\" for the field.",
+            name, f.name(), getOptionalText(field, "logicalType"));
     }
+    result.setFields(fields);
+    parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED);
+    parseAliases(schema, result);
+    return result;
   }
 
-  /**
-   * in complement of parseNamesDeclared, this method parse schema in details.
-   *
-   * @param schema       : json schema.
-   * @param names        : names map.
-   * @param currentSpace : current working name space.
-   * @return complete schema.
-   */
-  static Schema parseCompleteSchema(JsonNode schema, Names names, String 
currentSpace) {
-    if (schema == null) {
-      throw new SchemaParseException("Cannot parse <null> schema");
-    }
-    if (schema.isTextual()) {
-      String type = schema.asText();
-      Schema avroSchema = names.get(type);
-      if (avroSchema == null) {
-        avroSchema = names.get(currentSpace + "." + type);
-      }
-      return avroSchema;
-    }
-    if (schema.isArray()) {
-      List<Schema> schemas = StreamSupport.stream(schema.spliterator(), false)
-          .map((JsonNode sub) -> parseCompleteSchema(sub, names, 
currentSpace)).collect(Collectors.toList());
-      return Schema.createUnion(schemas);
-    }
-    if (schema.isObject()) {
-      Schema result = null;
-      String type = getRequiredText(schema, "type", "No type");
-      Name name = null;
+  private static Field parseField(JsonNode field, ParseContext context, String 
namespace) {
+    String fieldName = getRequiredText(field, "name", "No field name");
+    String fieldDoc = parseDoc(field);
+    JsonNode fieldTypeNode = field.get("type");
+    if (fieldTypeNode == null)
+      throw new SchemaParseException("No field type: " + field);
+    Schema fieldSchema = parse(fieldTypeNode, context, namespace);
 
-      final boolean isTypeError = "error".equals(type);
-      final boolean isTypeRecord = "record".equals(type);
-      final boolean isTypeArray = "array".equals(type);
+    Field.Order order = Field.Order.ASCENDING;
+    JsonNode orderNode = field.get("order");
+    if (orderNode != null)
+      order = 
Field.Order.valueOf(orderNode.textValue().toUpperCase(Locale.ENGLISH));
 
-      if (isTypeRecord || isTypeError || "enum".equals(type) || 
"fixed".equals(type)) {
-        // named schema
-        String space = getOptionalText(schema, "namespace");
+    JsonNode defaultValue = field.get("default");
+    if (defaultValue != null && (Type.FLOAT.equals(fieldSchema.getType()) || 
Type.DOUBLE.equals(fieldSchema.getType()))
+        && defaultValue.isTextual())
+      defaultValue = new 
DoubleNode(Double.parseDouble(defaultValue.textValue()));
 
-        if (space == null)
-          space = currentSpace;
-        name = new Name(getRequiredText(schema, "name", "No name in schema"), 
space);
+    Field f = new Field(fieldName, fieldSchema, fieldDoc, defaultValue, true, 
order);
+    parseProperties(field, f, FIELD_RESERVED);
+    f.aliases = parseAliases(field);
+    return f;
+  }
 
-        result = names.get(name);
-        if (result == null) {
-          throw new SchemaParseException("Unparsed field type " + name);
-        }
-      }
-      if (isTypeRecord || isTypeError) {
-        if (result != null && !result.hasFields()) {
-          final List<Field> fields = new ArrayList<>();
-          JsonNode fieldsNode = schema.get("fields");
-          if (fieldsNode == null || !fieldsNode.isArray())
-            throw new SchemaParseException("Record has no fields: " + schema);
-
-          for (JsonNode field : fieldsNode) {
-            Field f = Field.parse(field, names, name.space);
-
-            fields.add(f);
-            if (f.schema.getLogicalType() == null && getOptionalText(field, 
LOGICAL_TYPE_PROP) != null)
-              LOG.warn(
-                  "Ignored the {}.{}.logicalType property (\"{}\"). It should 
probably be nested inside the \"type\" for the field.",
-                  name, f.name, getOptionalText(field, "logicalType"));
-          }
-          result.setFields(fields);
-        }
-      } else if (isTypeArray) {
-        JsonNode items = schema.get("items");
-        Schema schemaItems = parseCompleteSchema(items, names, currentSpace);
-        result = Schema.createArray(schemaItems);
-      } else if ("map".equals(type)) {
-        JsonNode values = schema.get("values");
-        Schema mapItems = parseCompleteSchema(values, names, currentSpace);
-        result = Schema.createMap(mapItems);
-      } else if (result == null) {
-        result = names.get(currentSpace + "." + type);
-        if (result == null) {
-          result = names.get(type);
-        }
-      }
+  private static Schema parseEnum(JsonNode schema, ParseContext context, 
String currentNameSpace) {
+    Name name = parseName(schema, currentNameSpace);
+    String doc = parseDoc(schema);
 
-      Set<String> reserved = SCHEMA_RESERVED;
-      if ("enum".equals(type)) {
-        reserved = ENUM_RESERVED;
-      }
-      Schema.addProperties(schema, reserved, result);
-      return result;
+    JsonNode symbolsNode = schema.get("symbols");
+    if (symbolsNode == null || !symbolsNode.isArray()) {
+      throw new SchemaParseException("Enum has no symbols: " + schema);
     }
-    return null;
+    LockableArrayList<String> symbols = new 
LockableArrayList<>(symbolsNode.size());
+    for (JsonNode n : symbolsNode)
+      symbols.add(n.textValue());
+    JsonNode enumDefault = schema.get("default");
+    String defaultSymbol = null;
+    if (enumDefault != null) {
+      defaultSymbol = enumDefault.textValue();
+    }
+
+    Schema result = new EnumSchema(name, doc, symbols, defaultSymbol);
+    context.put(result);
+    parsePropertiesAndLogicalType(schema, result, ENUM_RESERVED);
+    parseAliases(schema, result);
+    return result;
   }
 
-  static Schema parse(JsonNode schema, Names names) {
-    if (schema == null) {
-      throw new SchemaParseException("Cannot parse <null> schema");
-    }
+  private static Schema parseArray(JsonNode schema, ParseContext context, 
String currentNameSpace) {
+    Schema result;
+    JsonNode itemsNode = schema.get("items");
+    if (itemsNode == null)
+      throw new SchemaParseException("Array has no items type: " + schema);
+    result = new ArraySchema(parse(itemsNode, context, currentNameSpace));
+    parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED);
+    return result;
+  }
+
+  private static Schema parseMap(JsonNode schema, ParseContext context, String 
currentNameSpace) {
+    Schema result;
+    JsonNode valuesNode = schema.get("values");
+    if (valuesNode == null)
+      throw new SchemaParseException("Map has no values type: " + schema);
+    result = new MapSchema(parse(valuesNode, context, currentNameSpace));
+    parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED);
+    return result;
+  }
+
+  private static Schema parseFixed(JsonNode schema, ParseContext context, 
String currentNameSpace) {
+    Name name = parseName(schema, currentNameSpace);
+    String doc = parseDoc(schema);
 
-    Schema result = Schema.parseNamesDeclared(schema, names, names.space);
-    Schema.parseCompleteSchema(schema, names, names.space);
+    JsonNode sizeNode = schema.get("size");
+    if (sizeNode == null || !sizeNode.isInt())
+      throw new SchemaParseException("Invalid or no size: " + schema);
 
+    Schema result = new FixedSchema(name, doc, sizeNode.intValue());
+    context.put(result);
+    parsePropertiesAndLogicalType(schema, result, SCHEMA_RESERVED);
+    parseAliases(schema, result);
     return result;
   }
 
-  static Schema resolveSchema(JsonNode schema, Names names, String 
currentNameSpace) {
-    String np = currentNameSpace;
-    String nodeName = getOptionalText(schema, "name");
-    if (nodeName != null) {
-      final JsonNode nameSpace = schema.get("namespace");
-      StringBuilder fullName = new StringBuilder();
-      if (nameSpace != null && nameSpace.isTextual()) {
-        fullName.append(nameSpace.asText()).append(".");
-        np = nameSpace.asText();
-      }
-      fullName.append(nodeName);
-      Schema schema1 = names.get(fullName.toString());
+  private static UnionSchema parseUnion(JsonNode schema, ParseContext context, 
String currentNameSpace) {
+    LockableArrayList<Schema> types = new LockableArrayList<>(schema.size());
+    for (JsonNode typeNode : schema)
+      types.add(parse(typeNode, context, currentNameSpace));
+    return new UnionSchema(types);
+  }
 
-      if (schema1 != null && schema1.getType() == Type.RECORD && 
!schema1.hasFields()) {
-        Schema.parseCompleteSchema(schema, names, np);
-      }
-      return schema1;
-    }
-    return null;
+  private static void parsePropertiesAndLogicalType(JsonNode jsonNode, Schema 
result, Set<String> propertiesToSkip) {
+    parseProperties(jsonNode, result, propertiesToSkip);
+    // parse logical type if present
+    result.logicalType = LogicalTypes.fromSchemaIgnoreInvalid(result);
+  }
+
+  private static void parseProperties(JsonNode schema, JsonProperties result, 
Set<String> propertiesToSkip) {
+    schema.fieldNames().forEachRemaining(prop -> { // add properties
+      if (!propertiesToSkip.contains(prop)) // ignore reserved
+        result.addProp(prop, schema.get(prop));
+    });
+  }
+
+  private static Name parseName(JsonNode schema, String currentNameSpace) {
+    String space = getOptionalText(schema, "namespace");
+    if (space == null)
+      space = currentNameSpace;
+    return new Name(getRequiredText(schema, "name", "No name in schema"), 
space);
+  }
+
+  private static String parseDoc(JsonNode schema) {
+    return getOptionalText(schema, "doc");
+  }
+
+  private static void parseAliases(JsonNode schema, Schema result) {
+    Set<String> aliases = parseAliases(schema);
+    if (aliases != null) // add aliases
+      for (String alias : aliases)
+        result.addAlias(alias);
   }
 
   static Set<String> parseAliases(JsonNode node) {
@@ -2199,13 +2044,14 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     Map<Name, Map<String, String>> fieldAliases = new HashMap<>(1);
     getAliases(reader, seen, aliases, fieldAliases);
 
-    if (aliases.size() == 0 && fieldAliases.size() == 0)
+    if (aliases.isEmpty() && fieldAliases.isEmpty())
       return writer; // no aliases
 
     seen.clear();
     return applyAliases(writer, seen, aliases, fieldAliases);
   }
 
+  @SuppressWarnings("DataFlowIssue")
   private static Schema applyAliases(Schema s, Map<Schema, Schema> seen, 
Map<Name, Name> aliases,
       Map<Name, Map<String, String>> fieldAliases) {
 
@@ -2261,6 +2107,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
     return result;
   }
 
+  @SuppressWarnings("DataFlowIssue")
   private static void getAliases(Schema schema, Map<Schema, Schema> seen, 
Map<Name, Name> aliases,
       Map<Name, Map<String, String>> fieldAliases) {
     if (schema instanceof NamedSchema) {
@@ -2322,10 +2169,11 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
    * <tt>true</tt> in the lock() method. It's legal to call lock() any number 
of
    * times. Any lock() other than the first one is a no-op.
    *
-   * This class throws <tt>IllegalStateException</tt> if a mutating operation 
is
-   * performed after being locked. Since modifications through iterator also 
use
+   * If a mutating operation is performed after being locked, it throws an
+   * <tt>IllegalStateException</tt>. Since modifications through iterator also 
use
    * the list's mutating operations, this effectively blocks all modifications.
    */
+  @SuppressWarnings("unused")
   static class LockableArrayList<E> extends ArrayList<E> {
     private static final long serialVersionUID = 1L;
     private boolean locked = false;
@@ -2341,6 +2189,7 @@ public abstract class Schema extends JsonProperties 
implements Serializable {
       super(types);
     }
 
+    @SafeVarargs
     public LockableArrayList(E... types) {
       super(types.length);
       Collections.addAll(this, types);
diff --git a/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java 
b/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java
index dfb3c01f3..e3eb2d9ab 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/SchemaParser.java
@@ -84,7 +84,7 @@ public class SchemaParser {
    *                              available
    * @see UtfTextUtils
    */
-  public Schema parse(File file) throws IOException, SchemaParseException {
+  public ParseResult parse(File file) throws IOException, SchemaParseException 
{
     return parse(file, null);
   }
 
@@ -99,7 +99,7 @@ public class SchemaParser {
    *                              suppressed underlying parse exceptions if
    *                              available
    */
-  public Schema parse(File file, Charset charset) throws IOException, 
SchemaParseException {
+  public ParseResult parse(File file, Charset charset) throws IOException, 
SchemaParseException {
     return parse(file.toPath(), charset);
   }
 
@@ -115,7 +115,7 @@ public class SchemaParser {
    *                              available
    * @see UtfTextUtils
    */
-  public Schema parse(Path file) throws IOException, SchemaParseException {
+  public ParseResult parse(Path file) throws IOException, SchemaParseException 
{
     return parse(file, null);
   }
 
@@ -130,7 +130,7 @@ public class SchemaParser {
    *                              suppressed underlying parse exceptions if
    *                              available
    */
-  public Schema parse(Path file, Charset charset) throws IOException, 
SchemaParseException {
+  public ParseResult parse(Path file, Charset charset) throws IOException, 
SchemaParseException {
     URI inputDir = file.getParent().toUri();
     try (InputStream stream = Files.newInputStream(file)) {
       String formattedSchema = UtfTextUtils.readAllBytes(stream, charset);
@@ -138,6 +138,24 @@ public class SchemaParser {
     }
   }
 
+  /**
+   * Parse an Avro schema from a file written with a specific character set.
+   *
+   * @param location the location of the schema resource
+   * @param charset  the character set of the schema resource
+   * @return the schema
+   * @throws IOException          when the schema cannot be read
+   * @throws SchemaParseException if parsing the schema failed; contains
+   *                              suppressed underlying parse exceptions if
+   *                              available
+   */
+  public ParseResult parse(URI location, Charset charset) throws IOException, 
SchemaParseException {
+    try (InputStream stream = location.toURL().openStream()) {
+      String formattedSchema = UtfTextUtils.readAllBytes(stream, charset);
+      return parse(location, formattedSchema);
+    }
+  }
+
   /**
    * Parse an Avro schema from an input stream. The stream content is assumed 
to
    * be UTF-8 text. Note that the stream stays open after reading.
@@ -150,7 +168,7 @@ public class SchemaParser {
    *                              available
    * @see UtfTextUtils
    */
-  public Schema parse(InputStream in) throws IOException, SchemaParseException 
{
+  public ParseResult parse(InputStream in) throws IOException, 
SchemaParseException {
     return parse(in, null);
   }
 
@@ -166,7 +184,7 @@ public class SchemaParser {
    *                              suppressed underlying parse exceptions if
    *                              available
    */
-  public Schema parse(InputStream in, Charset charset) throws IOException, 
SchemaParseException {
+  public ParseResult parse(InputStream in, Charset charset) throws 
IOException, SchemaParseException {
     return parse(UtfTextUtils.readAllBytes(in, charset));
   }
 
@@ -180,7 +198,7 @@ public class SchemaParser {
    *                              suppressed underlying parse exceptions if
    *                              available
    */
-  public Schema parse(Reader in) throws IOException, SchemaParseException {
+  public ParseResult parse(Reader in) throws IOException, SchemaParseException 
{
     return parse(UtfTextUtils.readAllChars(in));
   }
 
@@ -193,7 +211,7 @@ public class SchemaParser {
    *                              suppressed underlying parse exceptions if
    *                              available
    */
-  public Schema parse(CharSequence text) throws SchemaParseException {
+  public ParseResult parse(CharSequence text) throws SchemaParseException {
     try {
       return parse(null, text);
     } catch (IOException e) {
@@ -220,15 +238,14 @@ public class SchemaParser {
    * @throws RuntimeException     if thrown by one of the parsers
    * @throws SchemaParseException when all parsers fail
    */
-  private Schema parse(URI baseUri, CharSequence formattedSchema) throws 
IOException, SchemaParseException {
+  private ParseResult parse(URI baseUri, CharSequence formattedSchema) throws 
IOException, SchemaParseException {
     List<SchemaParseException> parseExceptions = new ArrayList<>();
     for (FormattedSchemaParser formattedSchemaParser : formattedSchemaParsers) 
{
       try {
         Schema schema = formattedSchemaParser.parse(parseContext, baseUri, 
formattedSchema);
-        if (parseContext.hasNewSchemas()) {
+        if (parseContext.hasNewSchemas() || schema != null) {
           // Parsing succeeded: return the result.
-          parseContext.commit();
-          return schema;
+          return parseContext.commit(schema);
         }
       } catch (SchemaParseException e) {
         parseContext.rollback();
@@ -246,4 +263,32 @@ public class SchemaParser {
     parseExceptions.forEach(parseException::addSuppressed);
     throw parseException;
   }
+
+  /**
+   * Get all parsed schemata.
+   *
+   * @return all parsed schemas, in the order they were parsed
+   */
+  public List<Schema> getParsedNamedSchemas() {
+    return parseContext.resolveAllSchemas();
+  }
+
+  // Temporary method to reduce PR size
+  @Deprecated
+  public Schema resolve(ParseResult result) {
+    return result.mainSchema();
+  }
+
+  public interface ParseResult {
+    /**
+     * The main schema parsed from a file. Can be any schema, or {@code null} 
if the
+     * parsed file has no "main" schema.
+     */
+    Schema mainSchema();
+
+    /**
+     * The list of named schemata that were parsed.
+     */
+    List<Schema> parsedNamedSchemas();
+  }
 }
diff --git 
a/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java 
b/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java
index c3a25a5e5..83285d371 100644
--- a/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java
+++ b/lang/java/avro/src/main/java/org/apache/avro/util/SchemaResolver.java
@@ -18,26 +18,19 @@
 package org.apache.avro.util;
 
 import org.apache.avro.AvroTypeException;
-import org.apache.avro.JsonProperties;
-import org.apache.avro.ParseContext;
-import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
 
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
-import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
-import java.util.stream.Collectors;
 
+import static java.util.Objects.requireNonNull;
 import static org.apache.avro.Schema.Type.ARRAY;
 import static org.apache.avro.Schema.Type.ENUM;
 import static org.apache.avro.Schema.Type.FIXED;
@@ -47,7 +40,8 @@ import static org.apache.avro.Schema.Type.UNION;
 
 /**
  * Utility class to resolve schemas that are unavailable at the point they are
- * referenced in the IDL.
+ * referenced in a schema file. This class is meant for internal use: use at
+ * your own risk!
  */
 public final class SchemaResolver {
 
@@ -111,67 +105,6 @@ public final class SchemaResolver {
     }
   }
 
-  /**
-   * Clone the provided schema while resolving all unreferenced schemas.
-   *
-   * @param parseContext the parse context with known names
-   * @param schema       the schema to resolve
-   * @return a copy of the schema with all schemas resolved
-   */
-  public static Schema resolve(final ParseContext parseContext, Schema schema) 
{
-    if (schema == null) {
-      return null;
-    }
-    ResolvingVisitor visitor = new ResolvingVisitor(schema, 
parseContext::resolve);
-    return Schemas.visit(schema, visitor);
-  }
-
-  /**
-   * Clone all provided schemas while resolving all unreferenced schemas.
-   *
-   * @param parseContext the parse context with known names
-   * @param schemas      the schemas to resolve
-   * @return a copy of all schemas with all schemas resolved
-   */
-  public static Collection<Schema> resolve(final ParseContext parseContext, 
Collection<Schema> schemas) {
-    ResolvingVisitor visitor = new ResolvingVisitor(null, 
parseContext::resolve);
-    return schemas.stream().map(schema -> Schemas.visit(schema, 
visitor.withRoot(schema))).collect(Collectors.toList());
-  }
-
-  /**
-   * Will clone the provided protocol while resolving all unreferenced schemas
-   *
-   * @param parseContext the parse context with known names
-   * @param protocol     the protocol to resolve
-   * @return a copy of the protocol with all schemas resolved
-   */
-  public static Protocol resolve(ParseContext parseContext, final Protocol 
protocol) {
-    // Create an empty copy of the protocol
-    Protocol result = new Protocol(protocol.getName(), protocol.getDoc(), 
protocol.getNamespace());
-    protocol.getObjectProps().forEach(((JsonProperties) result)::addProp);
-
-    ResolvingVisitor visitor = new ResolvingVisitor(null, 
parseContext::resolve);
-    Function<Schema, Schema> resolver = schema -> Schemas.visit(schema, 
visitor.withRoot(schema));
-
-    // Resolve all schemata in the protocol.
-    
result.setTypes(protocol.getTypes().stream().map(resolver).collect(Collectors.toList()));
-    Map<String, Protocol.Message> resultMessages = result.getMessages();
-    protocol.getMessages().forEach((name, oldValue) -> {
-      Protocol.Message newValue;
-      if (oldValue.isOneWay()) {
-        newValue = result.createMessage(oldValue.getName(), oldValue.getDoc(), 
oldValue,
-            resolver.apply(oldValue.getRequest()));
-      } else {
-        Schema request = resolver.apply(oldValue.getRequest());
-        Schema response = resolver.apply(oldValue.getResponse());
-        Schema errors = resolver.apply(oldValue.getErrors());
-        newValue = result.createMessage(oldValue.getName(), oldValue.getDoc(), 
oldValue, request, response, errors);
-      }
-      resultMessages.put(name, newValue);
-    });
-    return result;
-  }
-
   /**
    * This visitor checks if the current schema is fully resolved.
    */
@@ -216,78 +149,35 @@ public final class SchemaResolver {
    * This visitor creates clone of the visited Schemata, minus the specified
    * schema properties, and resolves all unresolved schemas.
    */
-  public static final class ResolvingVisitor implements SchemaVisitor<Schema> {
+  public static final class ResolvingVisitor implements SchemaVisitor<Void> {
     private static final Set<Schema.Type> CONTAINER_SCHEMA_TYPES = 
EnumSet.of(RECORD, ARRAY, MAP, UNION);
     private static final Set<Schema.Type> NAMED_SCHEMA_TYPES = 
EnumSet.of(RECORD, ENUM, FIXED);
 
     private final Function<String, Schema> symbolTable;
-    private final Set<String> schemaPropertiesToRemove;
     private final IdentityHashMap<Schema, Schema> replace;
 
-    private final Schema root;
-
-    public ResolvingVisitor(final Schema root, final Function<String, Schema> 
symbolTable,
-        String... schemaPropertiesToRemove) {
-      this(root, symbolTable, new 
HashSet<>(Arrays.asList(schemaPropertiesToRemove)));
-    }
-
-    public ResolvingVisitor(final Schema root, final Function<String, Schema> 
symbolTable,
-        Set<String> schemaPropertiesToRemove) {
+    public ResolvingVisitor(final Function<String, Schema> symbolTable) {
       this.replace = new IdentityHashMap<>();
       this.symbolTable = symbolTable;
-      this.schemaPropertiesToRemove = schemaPropertiesToRemove;
-
-      this.root = root;
-    }
-
-    public ResolvingVisitor withRoot(Schema root) {
-      return new ResolvingVisitor(root, symbolTable, schemaPropertiesToRemove);
     }
 
     @Override
     public SchemaVisitorAction visitTerminal(final Schema terminal) {
       Schema.Type type = terminal.getType();
-      Schema newSchema;
       if (CONTAINER_SCHEMA_TYPES.contains(type)) {
         if (!replace.containsKey(terminal)) {
           throw new IllegalStateException("Schema " + terminal + " must be 
already processed");
         }
-        return SchemaVisitorAction.CONTINUE;
-      } else if (type == ENUM) {
-        newSchema = Schema.createEnum(terminal.getName(), terminal.getDoc(), 
terminal.getNamespace(),
-            terminal.getEnumSymbols(), terminal.getEnumDefault());
-      } else if (type == FIXED) {
-        newSchema = Schema.createFixed(terminal.getName(), terminal.getDoc(), 
terminal.getNamespace(),
-            terminal.getFixedSize());
       } else {
-        newSchema = Schema.create(type);
+        replace.put(terminal, terminal);
       }
-      copyProperties(terminal, newSchema);
-      replace.put(terminal, newSchema);
       return SchemaVisitorAction.CONTINUE;
     }
 
-    public void copyProperties(final Schema first, final Schema second) {
-      // Logical type
-      Optional.ofNullable(first.getLogicalType()).ifPresent(logicalType -> 
logicalType.addToSchema(second));
-
-      // Aliases (if applicable)
-      if (NAMED_SCHEMA_TYPES.contains(first.getType())) {
-        first.getAliases().forEach(second::addAlias);
-      }
-
-      // Other properties
-      first.getObjectProps().forEach((name, value) -> {
-        if (!schemaPropertiesToRemove.contains(name)) {
-          second.addProp(name, value);
-        }
-      });
-    }
-
     @Override
     public SchemaVisitorAction visitNonTerminal(final Schema nt) {
       Schema.Type type = nt.getType();
-      if (type == RECORD) {
+      if (type == RECORD && !replace.containsKey(nt)) {
         if (isUnresolvedSchema(nt)) {
           // unresolved schema will get a replacement that we already 
encountered,
           // or we will attempt to resolve.
@@ -298,19 +188,32 @@ public final class SchemaResolver {
           }
           Schema replacement = replace.computeIfAbsent(resSchema, schema -> {
             Schemas.visit(schema, this);
-            return replace.get(schema);
+            return replace.get(schema); // This is not what the visitor 
returns!
           });
           replace.put(nt, replacement);
         } else {
-          // Create a clone without fields. Fields will be added in 
afterVisitNonTerminal.
-          Schema newSchema = Schema.createRecord(nt.getName(), nt.getDoc(), 
nt.getNamespace(), nt.isError());
-          copyProperties(nt, newSchema);
-          replace.put(nt, newSchema);
+          // Create a clone without fields or properties. They will be added in
+          // afterVisitNonTerminal, as they can both create circular 
references.
+          // (see org.apache.avro.TestCircularReferences as an example)
+          replace.put(nt, Schema.createRecord(nt.getName(), nt.getDoc(), 
nt.getNamespace(), nt.isError()));
         }
       }
       return SchemaVisitorAction.CONTINUE;
     }
 
+    public void copyProperties(final Schema first, final Schema second) {
+      // Logical type
+      Optional.ofNullable(first.getLogicalType()).ifPresent(logicalType -> 
logicalType.addToSchema(second));
+
+      // Aliases (if applicable)
+      if (NAMED_SCHEMA_TYPES.contains(first.getType())) {
+        first.getAliases().forEach(second::addAlias);
+      }
+
+      // Other properties
+      first.getObjectProps().forEach(second::addProp);
+    }
+
     @Override
     public SchemaVisitorAction afterVisitNonTerminal(final Schema nt) {
       Schema.Type type = nt.getType();
@@ -328,6 +231,7 @@ public final class SchemaResolver {
               newFields.add(new Schema.Field(field, 
replace.get(field.schema())));
             }
             newSchema.setFields(newFields);
+            copyProperties(nt, newSchema);
           }
         }
         return SchemaVisitorAction.CONTINUE;
@@ -335,15 +239,15 @@ public final class SchemaResolver {
         List<Schema> types = nt.getTypes();
         List<Schema> newTypes = new ArrayList<>(types.size());
         for (Schema sch : types) {
-          newTypes.add(replace.get(sch));
+          newTypes.add(requireNonNull(replace.get(sch)));
         }
         newSchema = Schema.createUnion(newTypes);
         break;
       case ARRAY:
-        newSchema = Schema.createArray(replace.get(nt.getElementType()));
+        newSchema = 
Schema.createArray(requireNonNull(replace.get(nt.getElementType())));
         break;
       case MAP:
-        newSchema = Schema.createMap(replace.get(nt.getValueType()));
+        newSchema = 
Schema.createMap(requireNonNull(replace.get(nt.getValueType())));
         break;
       default:
         throw new IllegalStateException("Illegal type " + type + ", schema " + 
nt);
@@ -354,14 +258,18 @@ public final class SchemaResolver {
     }
 
     @Override
-    public Schema get() {
-      return replace.get(root);
+    public Void get() {
+      return null;
+    }
+
+    public Schema getResolved(Schema schema) {
+      return requireNonNull(replace.get(schema),
+          () -> "Unknown schema: " + schema.getFullName() + ". Was it resolved 
before?");
     }
 
     @Override
     public String toString() {
-      return "ResolvingVisitor{symbolTable=" + symbolTable + ", 
schemaPropertiesToRemove=" + schemaPropertiesToRemove
-          + ", replace=" + replace + '}';
+      return "ResolvingVisitor{symbolTable=" + symbolTable + ", replace=" + 
replace + '}';
     }
   }
 }
diff --git 
a/lang/java/avro/src/test/java/org/apache/avro/DummySchemaParser.java 
b/lang/java/avro/src/test/java/org/apache/avro/DummySchemaParser.java
index 4802aea07..db7dc6405 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/DummySchemaParser.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/DummySchemaParser.java
@@ -38,7 +38,7 @@ public class DummySchemaParser implements 
FormattedSchemaParser {
   @Override
   public Schema parse(ParseContext parseContext, URI baseUri, CharSequence 
formattedSchema)
       throws IOException, SchemaParseException {
-    LOGGER.info("Using DummySchemaParser for {}", formattedSchema);
+    LOGGER.debug("Using DummySchemaParser for {}", formattedSchema);
     if (SCHEMA_TEXT_ONE.contentEquals(formattedSchema)) {
       parseContext.put(FIXED_SCHEMA);
       return FIXED_SCHEMA;
diff --git a/lang/java/avro/src/test/java/org/apache/avro/ParseContextTest.java 
b/lang/java/avro/src/test/java/org/apache/avro/ParseContextTest.java
index afd3a6435..d40a6cc9d 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/ParseContextTest.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/ParseContextTest.java
@@ -71,8 +71,8 @@ public class ParseContextTest {
 
     ParseContext context = new ParseContext();
     for (Schema.Type type : primitives) {
-      Schema first = context.resolve(type.getName());
-      Schema second = context.resolve(type.getName());
+      Schema first = context.find(type.getName(), null);
+      Schema second = context.find(type.getName(), null);
       assertEquals(first, second);
       assertNotSame(first, second);
 
@@ -85,26 +85,31 @@ public class ParseContextTest {
   public void validateSchemaRetrievalFailure() {
     Schema unknown = Schema.createFixed("unknown", null, null, 0);
 
-    Schema unresolved = fooBarBaz.resolve("unknown");
+    Schema unresolved = fooBarBaz.find("unknown", null);
     assertTrue(SchemaResolver.isUnresolvedSchema(unresolved));
     assertEquals(unknown.getFullName(), 
SchemaResolver.getUnresolvedSchemaName(unresolved));
   }
 
   @Test
   public void validateSchemaRetrievalByFullName() {
-    assertSame(fooRecord, fooBarBaz.resolve(fooRecord.getFullName()));
+    assertSame(fooRecord, fooBarBaz.find(fooRecord.getFullName(), null));
+  }
+
+  @Test
+  public void validateSchemaRetrievalBySimpleName() {
+    assertSame(fooRecord, fooBarBaz.find(fooRecord.getName(), 
fooRecord.getNamespace()));
   }
 
   @Test
   public void verifyPutIsIdempotent() {
     ParseContext context = new ParseContext();
-    assertNotEquals(fooRecord, context.resolve(fooRecord.getFullName()));
+    assertNotEquals(fooRecord, context.find(fooRecord.getFullName(), null));
 
     context.put(fooRecord);
-    assertEquals(fooRecord, context.resolve(fooRecord.getFullName()));
+    assertEquals(fooRecord, context.find(fooRecord.getFullName(), null));
 
     context.put(fooRecord);
-    assertEquals(fooRecord, context.resolve(fooRecord.getFullName()));
+    assertEquals(fooRecord, context.find(fooRecord.getFullName(), null));
   }
 
   @Test
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java 
b/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java
index 41f0474c9..6c4e35df9 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java
@@ -17,8 +17,6 @@
  */
 package org.apache.avro;
 
-import static org.junit.jupiter.api.Assertions.*;
-
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
@@ -44,14 +42,21 @@ import com.fasterxml.jackson.databind.node.IntNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.NullNode;
 import com.fasterxml.jackson.databind.node.TextNode;
-
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
 import org.apache.avro.generic.GenericData;
-
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import static java.util.Objects.requireNonNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
 public class TestSchema {
   @Test
   void splitSchemaBuild() {
@@ -583,26 +588,27 @@ public class TestSchema {
 
   @Test
   void testParseMultipleFile() throws IOException {
-    URL directory = 
Thread.currentThread().getContextClassLoader().getResource("multipleFile");
+    URL directory = 
requireNonNull(Thread.currentThread().getContextClassLoader().getResource("multipleFile"));
     File f1 = new File(directory.getPath(), "ApplicationEvent.avsc");
     File f2 = new File(directory.getPath(), "DocumentInfo.avsc");
     File f3 = new File(directory.getPath(), "MyResponse.avsc");
     Assertions.assertTrue(f1.exists(), "File not exist for test " + 
f1.getPath());
     Assertions.assertTrue(f2.exists(), "File not exist for test " + 
f2.getPath());
     Assertions.assertTrue(f3.exists(), "File not exist for test " + 
f3.getPath());
-
-    final List<Schema> schemas = new Schema.Parser().parse(Arrays.asList(f1, 
f2, f3));
+    SchemaParser parser = new SchemaParser();
+    parser.parse(f1);
+    parser.parse(f2);
+    parser.parse(f3);
+    final List<Schema> schemas = parser.getParsedNamedSchemas();
     Assertions.assertEquals(3, schemas.size());
     Schema schemaAppEvent = schemas.get(0);
     Schema schemaDocInfo = schemas.get(1);
     Schema schemaResponse = schemas.get(2);
-
     Assertions.assertNotNull(schemaAppEvent);
     Assertions.assertEquals(3, schemaAppEvent.getFields().size());
     Field documents = schemaAppEvent.getField("documents");
     Schema docSchema = documents.schema().getTypes().get(1).getElementType();
     Assertions.assertEquals(docSchema, schemaDocInfo);
-
     Assertions.assertNotNull(schemaDocInfo);
     Assertions.assertNotNull(schemaResponse);
   }
@@ -610,9 +616,9 @@ public class TestSchema {
   @Test
   void add_types() {
     String schemaRecord2 = "{\"type\":\"record\", \"name\":\"record2\", 
\"fields\": ["
-        + "  {\"name\":\"f1\", \"type\":\"record1\" }" + "]}";
-    // register schema1 in schema.
+        + "  {\"name\":\"f1\", \"type\":\"record1\" }" + "]}"; // register 
schema1 in schema.
     Schema schemaRecord1 = Schema.createRecord("record1", "doc", "", false);
+    schemaRecord1.setFields(Collections.singletonList(new Field("name", 
Schema.create(Type.STRING))));
     Schema.Parser parser = new 
Schema.Parser().addTypes(Collections.singleton(schemaRecord1));
 
     // parse schema for record2 that contains field for schema1.
@@ -628,6 +634,6 @@ public class TestSchema {
    */
   @Test
   void testParserNullValidate() {
-    new 
Schema.Parser(null).parse("{\"type\":\"record\",\"name\":\"\",\"fields\":[]}"); 
// Empty name
+    new Schema.Parser((NameValidator) 
null).parse("{\"type\":\"record\",\"name\":\"\",\"fields\":[]}"); // Empty name
   }
 }
diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaParser.java 
b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaParser.java
index dc0c77431..29c8f65be 100644
--- a/lang/java/avro/src/test/java/org/apache/avro/TestSchemaParser.java
+++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchemaParser.java
@@ -23,6 +23,7 @@ import org.junit.jupiter.api.Test;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.StringReader;
+import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -35,13 +36,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 class TestSchemaParser {
   private static final Schema SCHEMA_REAL = Schema.createFixed("Real", null, 
"tests", 42);
   private static final String SCHEMA_JSON = SCHEMA_REAL.toString(false);
+  private static final Charset[] UTF_CHARSETS = { StandardCharsets.UTF_8, 
StandardCharsets.UTF_16LE,
+      StandardCharsets.UTF_16BE };
 
   @Test
   void testParseFile() throws IOException {
     Path tempFile = Files.createTempFile("TestSchemaParser", null);
     Files.write(tempFile, singletonList(SCHEMA_JSON));
 
-    Schema schema = new SchemaParser().parse(tempFile.toFile());
+    Schema schema = new SchemaParser().parse(tempFile.toFile()).mainSchema();
     assertEquals(SCHEMA_REAL, schema);
   }
 
@@ -50,38 +53,49 @@ class TestSchemaParser {
     Path tempFile = Files.createTempFile("TestSchemaParser", null);
     Files.write(tempFile, singletonList(SCHEMA_JSON));
 
-    Schema schema = new SchemaParser().parse(tempFile);
+    Schema schema = new SchemaParser().parse(tempFile).mainSchema();
+    assertEquals(SCHEMA_REAL, schema);
+  }
+
+  @Test
+  void testParseURI() throws IOException {
+    Path tempFile = Files.createTempFile("TestSchemaParser", null);
+    Charset charset = UTF_CHARSETS[(int) Math.floor(UTF_CHARSETS.length * 
Math.random())];
+    Files.write(tempFile, singletonList(SCHEMA_JSON), charset);
+
+    Schema schema = new SchemaParser().parse(tempFile.toUri(), 
null).mainSchema();
     assertEquals(SCHEMA_REAL, schema);
   }
 
   @Test
   void testParseReader() throws IOException {
-    Schema schema = new SchemaParser().parse(new StringReader(SCHEMA_JSON));
+    Schema schema = new SchemaParser().parse(new 
StringReader(SCHEMA_JSON)).mainSchema();
     assertEquals(SCHEMA_REAL, schema);
   }
 
   @Test
   void testParseStream() throws IOException {
-    Schema schema = new SchemaParser().parse(new 
ByteArrayInputStream(SCHEMA_JSON.getBytes(StandardCharsets.UTF_16)));
+    Schema schema = new SchemaParser().parse(new 
ByteArrayInputStream(SCHEMA_JSON.getBytes(StandardCharsets.UTF_16)))
+        .mainSchema();
     assertEquals(SCHEMA_REAL, schema);
   }
 
   @Test
   void testParseTextWithFallbackJsonParser() {
-    Schema schema = new SchemaParser().parse(SCHEMA_JSON);
+    Schema schema = new SchemaParser().parse(SCHEMA_JSON).mainSchema();
     assertEquals(SCHEMA_REAL, schema);
   }
 
   @Test
   void testParseByCustomParser() {
-    Schema schema = new 
SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_ONE);
+    Schema schema = new 
SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_ONE).mainSchema();
     assertEquals(DummySchemaParser.FIXED_SCHEMA, schema);
   }
 
   @Test
   void testSingleParseError() {
     SchemaParseException parseException = 
assertThrows(SchemaParseException.class,
-        () -> new SchemaParser().parse("foo"));
+        () -> new SchemaParser().parse("foo").mainSchema());
     assertEquals(JsonParseException.class, 
parseException.getCause().getClass());
     assertEquals(0, parseException.getSuppressed().length);
   }
@@ -89,7 +103,7 @@ class TestSchemaParser {
   @Test
   void testMultipleParseErrors() {
     SchemaParseException parseException = 
assertThrows(SchemaParseException.class,
-        () -> new SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_ERROR));
+        () -> new 
SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_ERROR).mainSchema());
     assertTrue(parseException.getMessage().startsWith("Could not parse the 
schema"));
     Throwable[] suppressed = parseException.getSuppressed();
     assertEquals(2, suppressed.length);
@@ -100,7 +114,7 @@ class TestSchemaParser {
   @Test
   void testIOFailureWhileParsingText() {
     AvroRuntimeException exception = assertThrows(AvroRuntimeException.class,
-        () -> new 
SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_IO_ERROR));
+        () -> new 
SchemaParser().parse(DummySchemaParser.SCHEMA_TEXT_IO_ERROR).mainSchema());
     assertEquals(IOException.class, exception.getCause().getClass());
     assertEquals(DummySchemaParser.IO_ERROR_MESSAGE, 
exception.getCause().getMessage());
   }
diff --git 
a/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java
 
b/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java
index 6a1a13789..e3e1a2ddb 100644
--- 
a/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java
+++ 
b/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java
@@ -17,6 +17,10 @@
  */
 package org.apache.avro.compiler.idl;
 
+import org.apache.avro.Protocol;
+import org.apache.avro.Schema;
+import org.apache.avro.compiler.schema.Schemas;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -26,10 +30,6 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Function;
 
-import org.apache.avro.Protocol;
-import org.apache.avro.Schema;
-import org.apache.avro.compiler.schema.Schemas;
-
 /**
  * Utility class to resolve schemas that are unavailable at the time they are
  * referenced in the IDL.
@@ -103,12 +103,12 @@ final class SchemaResolver {
   /**
    * Will clone the provided protocol while resolving all unreferenced schemas
    *
-   * @param protocol
-   * @return
+   * @param protocol a protocol with possibly unresolved schema references
+   * @return a protocol without unresolved schema references
    */
   static Protocol resolve(final Protocol protocol) {
     Protocol result = new Protocol(protocol.getName(), protocol.getDoc(), 
protocol.getNamespace());
-    final Collection<Schema> types = protocol.getTypes();
+    final Collection<Schema> types = protocol.getUnresolvedTypes();
     // replace unresolved schemas.
     List<Schema> newSchemas = new ArrayList<>(types.size());
     IdentityHashMap<Schema, Schema> replacements = new IdentityHashMap<>();
diff --git 
a/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java
 
b/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java
index e7c854c57..53675f4a0 100644
--- 
a/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java
+++ 
b/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java
@@ -28,6 +28,7 @@ import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -36,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.avro.Conversion;
 import org.apache.avro.Conversions;
@@ -47,6 +49,7 @@ import org.apache.avro.Protocol.Message;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.SchemaNormalization;
+import org.apache.avro.SchemaParser;
 import org.apache.avro.data.TimeConversions;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.StringType;
@@ -468,12 +471,16 @@ public class SpecificCompiler {
    * Generates Java classes for a number of schema files.
    */
   public static void compileSchema(File[] srcFiles, File dest) throws 
IOException {
-    Schema.Parser parser = new Schema.Parser();
+    SchemaParser parser = new SchemaParser();
 
     for (File src : srcFiles) {
-      Schema schema = parser.parse(src);
+      parser.parse(src);
+    }
+    // FIXME: use lastModified() without causing a NoSuchMethodError in the 
build
+    File lastModifiedSourceFile = 
Stream.of(srcFiles).max(Comparator.comparing(File::lastModified)).orElse(null);
+    for (Schema schema : parser.getParsedNamedSchemas()) {
       SpecificCompiler compiler = new SpecificCompiler(schema);
-      compiler.compileToDestination(src, dest);
+      compiler.compileToDestination(lastModifiedSourceFile, dest);
     }
   }
 
diff --git 
a/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj 
b/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj
index af2480ce9..2d312794c 100644
--- a/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj
+++ b/lang/java/compiler/src/main/javacc/org/apache/avro/compiler/idl/idl.jj
@@ -1287,6 +1287,8 @@ Schema ImportSchema() : {
   <SCHEMA> importFile = JsonString() ";"
     {
       try (InputStream stream=findFile(importFile).openStream()){
+        // This usage of Schema.Parser should not be changed.
+        // Remove this whole (old) IDL parser instead.
         Parser parser = new Schema.Parser();
         parser.addTypes(names.values());          // inherit names
         Schema value = parser.parse(stream);
diff --git a/lang/java/idl/src/main/java/org/apache/avro/idl/IdlFile.java 
b/lang/java/idl/src/main/java/org/apache/avro/idl/IdlFile.java
index 75d0f73fa..3b3f6b97c 100644
--- a/lang/java/idl/src/main/java/org/apache/avro/idl/IdlFile.java
+++ b/lang/java/idl/src/main/java/org/apache/avro/idl/IdlFile.java
@@ -17,6 +17,7 @@
  */
 package org.apache.avro.idl;
 
+import org.apache.avro.ParseContext;
 import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
 
@@ -32,24 +33,24 @@ import java.util.stream.Collectors;
  * the protocol containing the schemas.
  */
 public class IdlFile {
-  private final Schema mainSchema;
-  private final Protocol protocol;
-  private final Map<String, Schema> namedSchemas;
+  private final Object resolveLock = new Object();
+  private volatile ParseContext parseContext;
+  private Schema mainSchema;
+  private Protocol protocol;
+  private Map<String, Schema> namedSchemas;
   private final List<String> warnings;
 
-  IdlFile(Protocol protocol, List<String> warnings) {
-    this(protocol.getTypes(), null, protocol, warnings);
+  IdlFile(Protocol protocol, ParseContext context, List<String> warnings) {
+    this(context, null, protocol, warnings);
   }
 
-  IdlFile(Schema mainSchema, Iterable<Schema> schemas, List<String> warnings) {
-    this(schemas, mainSchema, null, warnings);
+  IdlFile(Schema mainSchema, ParseContext context, List<String> warnings) {
+    this(context, mainSchema, null, warnings);
   }
 
-  private IdlFile(Iterable<Schema> schemas, Schema mainSchema, Protocol 
protocol, List<String> warnings) {
+  private IdlFile(ParseContext context, Schema mainSchema, Protocol protocol, 
List<String> warnings) {
+    this.parseContext = context;
     this.namedSchemas = new LinkedHashMap<>();
-    for (Schema namedSchema : schemas) {
-      this.namedSchemas.put(namedSchema.getFullName(), namedSchema);
-    }
     this.mainSchema = mainSchema;
     this.protocol = protocol;
     this.warnings = Collections.unmodifiableList(new ArrayList<>(warnings));
@@ -59,13 +60,55 @@ public class IdlFile {
    * The (main) schema defined by the IDL file.
    */
   public Schema getMainSchema() {
+    if (mainSchema == null) {
+      return null;
+    }
+    ensureSchemasAreResolved();
     return mainSchema;
   }
 
+  private void ensureSchemasAreResolved() {
+    if (parseContext != null) {
+      synchronized (resolveLock) {
+        if (parseContext != null) {
+          parseContext.commit();
+          List<Schema> schemas = parseContext.resolveAllSchemas();
+          schemas.forEach(schema -> namedSchemas.put(schema.getFullName(), 
schema));
+          if (mainSchema != null) {
+            mainSchema = parseContext.resolve(mainSchema);
+          }
+          if (protocol != null) {
+            protocol.setTypes(schemas);
+            Map<String, Protocol.Message> messages = protocol.getMessages();
+            for (Map.Entry<String, Protocol.Message> entry : 
messages.entrySet()) {
+              Protocol.Message oldValue = entry.getValue();
+              Protocol.Message newValue;
+              if (oldValue.isOneWay()) {
+                newValue = protocol.createMessage(oldValue.getName(), 
oldValue.getDoc(), oldValue,
+                    parseContext.resolve(oldValue.getRequest()));
+              } else {
+                Schema request = parseContext.resolve(oldValue.getRequest());
+                Schema response = parseContext.resolve(oldValue.getResponse());
+                Schema errors = parseContext.resolve(oldValue.getErrors());
+                newValue = protocol.createMessage(oldValue.getName(), 
oldValue.getDoc(), oldValue, request, response,
+                    errors);
+              }
+              entry.setValue(newValue);
+            }
+          }
+        }
+      }
+    }
+  }
+
   /**
    * The protocol defined by the IDL file.
    */
   public Protocol getProtocol() {
+    if (protocol == null) {
+      return null;
+    }
+    ensureSchemasAreResolved();
     return protocol;
   }
 
@@ -83,6 +126,7 @@ public class IdlFile {
    * The named schemas defined by the IDL file, mapped by their full name.
    */
   public Map<String, Schema> getNamedSchemas() {
+    ensureSchemasAreResolved();
     return Collections.unmodifiableMap(namedSchemas);
   }
 
@@ -95,11 +139,13 @@ public class IdlFile {
    * @return the schema, or {@code null} if it does not exist
    */
   public Schema getNamedSchema(String name) {
+    ensureSchemasAreResolved();
     return namedSchemas.get(name);
   }
 
   // Visible for testing
   String outputString() {
+    ensureSchemasAreResolved();
     if (protocol != null) {
       return protocol.toString();
     }
diff --git a/lang/java/idl/src/main/java/org/apache/avro/idl/IdlReader.java 
b/lang/java/idl/src/main/java/org/apache/avro/idl/IdlReader.java
index 047d16287..96d45ab6d 100644
--- a/lang/java/idl/src/main/java/org/apache/avro/idl/IdlReader.java
+++ b/lang/java/idl/src/main/java/org/apache/avro/idl/IdlReader.java
@@ -38,7 +38,6 @@ import org.apache.avro.JsonProperties;
 import org.apache.avro.JsonSchemaParser;
 import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
-import org.apache.avro.NameValidator;
 import org.apache.avro.ParseContext;
 import org.apache.avro.Protocol;
 import org.apache.avro.Schema;
@@ -90,6 +89,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Deque;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -145,6 +145,8 @@ public class IdlReader {
   private static final Set<String> INVALID_TYPE_NAMES = new 
HashSet<>(Arrays.asList("boolean", "int", "long", "float",
       "double", "bytes", "string", "null", "date", "time_ms", "timestamp_ms", 
"localtimestamp_ms", "uuid"));
   private static final String CLASSPATH_SCHEME = "classpath";
+  private static final Set<Schema.Type> NAMED_SCHEMA_TYPES = 
EnumSet.of(Schema.Type.RECORD, Schema.Type.ENUM,
+      Schema.Type.FIXED);
 
   private final Set<URI> readLocations;
   private final ParseContext parseContext;
@@ -153,34 +155,19 @@ public class IdlReader {
     this(new ParseContext());
   }
 
-  public IdlReader(NameValidator nameValidator) {
-    this(new ParseContext(nameValidator));
-  }
-
   public IdlReader(ParseContext parseContext) {
     readLocations = new HashSet<>();
     this.parseContext = parseContext;
   }
 
   private Schema namedSchemaOrUnresolved(String fullName) {
-    return parseContext.resolve(fullName);
+    return parseContext.find(fullName, null);
   }
 
   private void addSchema(Schema schema) {
     parseContext.put(schema);
   }
 
-  public IdlFile resolve(IdlFile unresolved) {
-    Protocol protocol = unresolved.getProtocol();
-    if (protocol == null) {
-      Schema mainSchema = SchemaResolver.resolve(parseContext, 
unresolved.getMainSchema());
-      Iterable<Schema> namedSchemas = SchemaResolver.resolve(parseContext, 
unresolved.getNamedSchemas().values());
-      return new IdlFile(mainSchema, namedSchemas, unresolved.getWarnings());
-    } else {
-      return new IdlFile(SchemaResolver.resolve(parseContext, protocol), 
unresolved.getWarnings());
-    }
-  }
-
   public IdlFile parse(Path location) throws IOException {
     return parse(location.toUri());
   }
@@ -364,9 +351,9 @@ public class IdlReader {
     @Override
     public void exitIdlFile(IdlFileContext ctx) {
       if (protocol == null) {
-        result = new IdlFile(mainSchema, parseContext.typesByName().values(), 
warnings);
+        result = new IdlFile(mainSchema, parseContext, warnings);
       } else {
-        result = new IdlFile(protocol, warnings);
+        result = new IdlFile(protocol, parseContext, warnings);
       }
     }
 
@@ -390,8 +377,10 @@ public class IdlReader {
 
     @Override
     public void exitProtocolDeclaration(ProtocolDeclarationContext ctx) {
-      if (protocol != null)
-        protocol.setTypes(parseContext.typesByName().values());
+      if (protocol != null) {
+        parseContext.commit();
+        protocol.setTypes(parseContext.resolveAllSchemas());
+      }
       if (!namespaces.isEmpty())
         popNamespace();
     }
@@ -404,6 +393,10 @@ public class IdlReader {
     @Override
     public void 
exitMainSchemaDeclaration(IdlParser.MainSchemaDeclarationContext ctx) {
       mainSchema = typeStack.pop();
+
+      if (NAMED_SCHEMA_TYPES.contains(mainSchema.getType()) && 
mainSchema.getFullName() != null) {
+        parseContext.put(mainSchema);
+      }
       assert typeStack.isEmpty();
     }
 
diff --git a/lang/java/idl/src/test/idl/input/schema_syntax_schema.avdl 
b/lang/java/idl/src/test/idl/input/schema_syntax_schema.avdl
index 1df43f7a6..6a2d19a0e 100644
--- a/lang/java/idl/src/test/idl/input/schema_syntax_schema.avdl
+++ b/lang/java/idl/src/test/idl/input/schema_syntax_schema.avdl
@@ -36,7 +36,7 @@ record StatusUpdate {
   /**
    * The new status of the process.
    */
-  Status newStatus;
+  system.Status newStatus;
   /**
    * A description why this status change occurred (optional).
    */
diff --git a/lang/java/idl/src/test/idl/input/status_schema.avdl 
b/lang/java/idl/src/test/idl/input/status_schema.avdl
index 504218a4f..e4fda1d33 100644
--- a/lang/java/idl/src/test/idl/input/status_schema.avdl
+++ b/lang/java/idl/src/test/idl/input/status_schema.avdl
@@ -1,3 +1,5 @@
+namespace system;
+
 enum Status {
        UNKNOWN, NEW, STARTUP, RUNNING, TERMINATING, SHUTDOWN, CRASHED
 } = UNKNOWN;
diff --git a/lang/java/idl/src/test/java/org/apache/avro/idl/TestCycle.java 
b/lang/java/idl/src/test/java/org/apache/avro/idl/TestCycle.java
index a3c94ff02..f9cb54f34 100644
--- a/lang/java/idl/src/test/java/org/apache/avro/idl/TestCycle.java
+++ b/lang/java/idl/src/test/java/org/apache/avro/idl/TestCycle.java
@@ -45,7 +45,7 @@ public class TestCycle {
   public void testCycleGeneration() throws IOException, URISyntaxException {
     final ClassLoader cl = Thread.currentThread().getContextClassLoader();
     IdlReader parser = new IdlReader();
-    IdlFile idlFile = 
parser.resolve(parser.parse(requireNonNull(cl.getResource("input/cycle.avdl")).toURI()));
+    IdlFile idlFile = 
parser.parse(requireNonNull(cl.getResource("input/cycle.avdl")).toURI());
     String json = idlFile.outputString();
     LOG.info(json);
 
diff --git 
a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLMojo.java 
b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLMojo.java
index 54f49b04f..632ce7f95 100644
--- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLMojo.java
+++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/IDLMojo.java
@@ -90,7 +90,6 @@ public class IDLMojo extends AbstractAvroMojo {
         for (String warning : idlFile.getWarnings()) {
           getLog().warn(warning);
         }
-        idlFile = parser.resolve(idlFile);
         final SpecificCompiler compiler;
         final Protocol protocol = idlFile.getProtocol();
         if (protocol != null) {
diff --git 
a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java 
b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java
index 36a4fc4a5..9d1a91426 100644
--- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java
+++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java
@@ -20,6 +20,7 @@ package org.apache.avro.mojo;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaParseException;
+import org.apache.avro.SchemaParser;
 import org.apache.maven.plugin.MojoExecutionException;
 
 import java.io.File;
@@ -42,7 +43,7 @@ public class SchemaMojo extends AbstractAvroMojo {
    * A parser used to parse all schema files. Using a common parser will
    * facilitate the import of external schemas.
    */
-  private Schema.Parser schemaParser = new Schema.Parser();
+  private SchemaParser schemaParser = new SchemaParser();
 
   /**
    * A set of Ant-like inclusion patterns used to select files from the source
@@ -76,11 +77,11 @@ public class SchemaMojo extends AbstractAvroMojo {
       // no imported files then isolate the schemas from each other, otherwise
       // allow them to share a single schema so reuse and sharing of schema
       // is possible.
-      if (imports == null) {
-        schemas = new Schema.Parser().parse(sourceFiles);
-      } else {
-        schemas = schemaParser.parse(sourceFiles);
+      SchemaParser parser = imports == null ? new SchemaParser() : 
schemaParser;
+      for (File sourceFile : sourceFiles) {
+        parser.parse(sourceFile);
       }
+      schemas = parser.getParsedNamedSchemas();
 
       doCompile(sourceFileForModificationDetection, schemas, outputDirectory);
     } catch (IOException | SchemaParseException ex) {
diff --git a/lang/java/tools/src/main/java/org/apache/avro/tool/IdlTool.java 
b/lang/java/tools/src/main/java/org/apache/avro/tool/IdlTool.java
index 6ef827146..7f7179dad 100644
--- a/lang/java/tools/src/main/java/org/apache/avro/tool/IdlTool.java
+++ b/lang/java/tools/src/main/java/org/apache/avro/tool/IdlTool.java
@@ -71,7 +71,6 @@ public class IdlTool implements Tool {
       for (String warning : idlFile.getWarnings()) {
         err.println("Warning: " + warning);
       }
-      idlFile = parser.resolve(idlFile);
       p = idlFile.getProtocol();
       m = idlFile.getMainSchema();
     }

Reply via email to