[add] incomplete implementation to use generic schema traverser. Project: http://git-wip-us.apache.org/repos/asf/avro/repo Commit: http://git-wip-us.apache.org/repos/asf/avro/commit/c3384e4d Tree: http://git-wip-us.apache.org/repos/asf/avro/tree/c3384e4d Diff: http://git-wip-us.apache.org/repos/asf/avro/diff/c3384e4d
Branch: refs/heads/master Commit: c3384e4d016bac264f6577fff9543199a1fb1970 Parents: 68a5418 Author: Zoltan Farkas <[email protected]> Authored: Tue Feb 7 13:36:56 2017 -0500 Committer: Zoltan Farkas <[email protected]> Committed: Tue Feb 7 13:36:56 2017 -0500 ---------------------------------------------------------------------- .../test/java/org/apache/avro/TestFixed.java | 34 +++ lang/java/compiler/pom.xml | 11 +- .../avro/compiler/idl/SchemaResolver.java | 76 +++--- .../avro/compiler/schema/SchemaVisitor.java | 67 +++++ .../compiler/schema/SchemaVisitorAction.java | 44 ++++ .../apache/avro/compiler/schema/Schemas.java | 245 +++++++++++++++++++ 6 files changed, 432 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/avro/blob/c3384e4d/lang/java/avro/src/test/java/org/apache/avro/TestFixed.java ---------------------------------------------------------------------- diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestFixed.java b/lang/java/avro/src/test/java/org/apache/avro/TestFixed.java new file mode 100644 index 0000000..14ff5ce --- /dev/null +++ b/lang/java/avro/src/test/java/org/apache/avro/TestFixed.java @@ -0,0 +1,34 @@ +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.avro; + +import java.nio.ByteBuffer; +import org.junit.Assert; +import org.junit.Test; + +public class TestFixed { + + + @Test + public void testFixedDefaultValueDrop() { + Schema md5 = SchemaBuilder.builder().fixed("MD5").size(16); + Schema frec = SchemaBuilder.builder().record("test") + .fields().name("hash").type(md5).withDefault(ByteBuffer.wrap(new byte[16])).endRecord(); + Schema.Field field = frec.getField("hash"); + Assert.assertNotNull(field.defaultVal()); + } + +} http://git-wip-us.apache.org/repos/asf/avro/blob/c3384e4d/lang/java/compiler/pom.xml ---------------------------------------------------------------------- diff --git a/lang/java/compiler/pom.xml b/lang/java/compiler/pom.xml index 3ff0101..2a35c45 100644 --- a/lang/java/compiler/pom.xml +++ b/lang/java/compiler/pom.xml @@ -138,12 +138,11 @@ <artifactId>guava</artifactId> <version>${guava.version}</version> <scope>test</scope> - <exclusions> - <exclusion> <!-- GPL --> - <groupId>com.google.code.findbugs</groupId> - <artifactId>jsr305</artifactId> - </exclusion> - </exclusions> + </dependency> + <dependency> <!-- apache licensed --> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + <version>3.0.1</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/avro/blob/c3384e4d/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java ---------------------------------------------------------------------- diff --git a/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java b/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java index 56386b3..6aed787 100644 --- a/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java +++ b/lang/java/compiler/src/main/java/org/apache/avro/compiler/idl/SchemaResolver.java @@ -21,11 +21,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.avro.JsonProperties; -import org.apache.avro.LogicalType; import org.apache.avro.Protocol; import org.apache.avro.Schema; -import org.codehaus.jackson.JsonNode; +import org.apache.avro.compiler.schema.Schemas; /** * Utility class to resolve schemas that are unavailable at the time they are referenced in the IDL. @@ -39,12 +37,12 @@ final class SchemaResolver { private static final String UR_SCHEMA_ATTR = "org.apache.avro.compiler.idl.unresolved.name"; private static final String UR_SCHEMA_NAME = "UnresolvedSchema"; - - private static final String UR_SCHEMA_NS = "org.apache.avro.compiler"; - + + private static final String UR_SCHEMA_NS = "org.apache.avro.compiler"; + static Schema unresolvedSchema(final String name) { - - + + Schema schema = Schema.createRecord(UR_SCHEMA_NAME, "unresolved schema", UR_SCHEMA_NS, false, Collections.EMPTY_LIST); schema.addProp(UR_SCHEMA_ATTR, name); @@ -52,12 +50,13 @@ final class SchemaResolver { } static boolean isUnresolvedSchema(final Schema schema) { - return (schema.getType() == Schema.Type.RECORD && schema.getProp(UR_SCHEMA_ATTR) != null); + return (schema.getType() == Schema.Type.RECORD && schema.getProp(UR_SCHEMA_ATTR) != null + && UR_SCHEMA_NAME.equals(schema.getName()) + && UR_SCHEMA_NS.equals(schema.getNamespace())); } static String getUnresolvedSchemaName(final Schema schema) { - if (schema.getType() != Schema.Type.RECORD || !UR_SCHEMA_NAME.equals(schema.getName()) - || !UR_SCHEMA_NS.equals(schema.getNamespace())) { + if (!isUnresolvedSchema(schema)) { throw new IllegalArgumentException("Not a unresolved schema: " + schema); } String name = schema.getProp(UR_SCHEMA_ATTR); @@ -100,15 +99,10 @@ final class SchemaResolver { } result.getMessages().put(entry.getKey(), nvalue); } - copyProps(protocol, result); + Schemas.copyProperties(protocol, result); return result; } - private static void copyProps(final JsonProperties from, final JsonProperties to) { - for (Map.Entry<String, JsonNode> entry : from.getJsonProps().entrySet()) { - to.addProp(entry.getKey(), entry.getValue()); - } - } /** * Resolve all unresolved schema references. @@ -138,27 +132,26 @@ final class SchemaResolver { final List<Schema.Field> currFields = schema.getFields(); List<Schema.Field> newFields = new ArrayList<Schema.Field>(currFields.size()); for (Schema.Field field : currFields) { + if (field.name().equals("hash")) { + System.err.println(field); + } Schema.Field nf = new Schema.Field(field.name(), resolve(field.schema(), protocol, resolved), field.doc(), field.defaultVal(), field.order()); - for (String alias : field.aliases()) { - nf.addAlias(alias); - } + Schemas.copyAliases(field, nf); + Schemas.copyProperties(field, nf); newFields.add(nf); } createRecord.setFields(newFields); - final LogicalType lt = schema.getLogicalType(); - if (lt != null) { - lt.addToSchema(createRecord); - } - copyProps(schema, createRecord); + Schemas.copyLogicalTypes(schema, createRecord); + Schemas.copyProperties(schema, createRecord); return createRecord; case MAP: Schema result = Schema.createMap(resolve(schema.getValueType(), protocol, resolved)); - copyProps(schema, result); + Schemas.copyProperties(schema, result); return result; case ARRAY: Schema aresult = Schema.createArray(resolve(schema.getElementType(), protocol, resolved)); - copyProps(schema, aresult); + Schemas.copyProperties(schema, aresult); return aresult; case UNION: final List<Schema> uTypes = schema.getTypes(); @@ -167,7 +160,7 @@ final class SchemaResolver { newTypes.add(resolve(s, protocol, resolved)); } Schema bresult = Schema.createUnion(newTypes); - copyProps(schema, bresult); + Schemas.copyProperties(schema, bresult); return bresult; case ENUM: case FIXED: @@ -208,31 +201,36 @@ final class SchemaResolver { final List<Schema.Field> currFields = schema.getFields(); List<Schema.Field> newFields = new ArrayList<Schema.Field>(currFields.size()); for (Schema.Field field : currFields) { + if (field.name().equals("hash")) { + System.err.println(field); + } Schema.Field nf = new Schema.Field(field.name(), getResolvedSchema(field.schema(), resolved), field.doc(), field.defaultVal(), field.order()); - for (String alias : field.aliases()) { - nf.addAlias(alias); - } + Schemas.copyAliases(field, nf); + Schemas.copyProperties(field, nf); newFields.add(nf); } createRecord.setFields(newFields); - final LogicalType lt = schema.getLogicalType(); - if (lt != null) { - lt.addToSchema(createRecord); - } - copyProps(schema, createRecord); + Schemas.copyLogicalTypes(schema, createRecord); + Schemas.copyProperties(schema, createRecord); return createRecord; case MAP: - return Schema.createMap(getResolvedSchema(schema.getValueType(), resolved)); + Schema createMap = Schema.createMap(getResolvedSchema(schema.getValueType(), resolved)); + Schemas.copyProperties(schema, createMap); + return createMap; case ARRAY: - return Schema.createArray(getResolvedSchema(schema.getElementType(), resolved)); + Schema createArray = Schema.createArray(getResolvedSchema(schema.getElementType(), resolved)); + Schemas.copyProperties(schema, createArray); + return createArray; case UNION: final List<Schema> uTypes = schema.getTypes(); List<Schema> newTypes = new ArrayList<Schema>(uTypes.size()); for (Schema s : uTypes) { newTypes.add(getResolvedSchema(s, resolved)); } - return Schema.createUnion(newTypes); + Schema createUnion = Schema.createUnion(newTypes); + Schemas.copyProperties(schema, createUnion); + return createUnion; case ENUM: case FIXED: case STRING: http://git-wip-us.apache.org/repos/asf/avro/blob/c3384e4d/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/SchemaVisitor.java ---------------------------------------------------------------------- diff --git a/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/SchemaVisitor.java b/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/SchemaVisitor.java new file mode 100644 index 0000000..df667f4 --- /dev/null +++ b/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/SchemaVisitor.java @@ -0,0 +1,67 @@ + /* + * Copyright (c) 2001 - 2016, Zoltan Farkas All Rights Reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ +package org.apache.avro.compiler.schema; + +import javax.annotation.CheckReturnValue; +import javax.annotation.Nonnull; +import javax.annotation.ParametersAreNonnullByDefault; +import org.apache.avro.Schema; + +/** + * @author zoly + */ +@ParametersAreNonnullByDefault +public interface SchemaVisitor<T> { + + /** + * Invoked for schemas that do not have "child" schemas (like string, int ...) + * or for a previously encountered schema with children, + * which will be treated as a terminal. (to avoid circular recursion) + * @param terminal + * @return + */ + @Nonnull + @CheckReturnValue + SchemaVisitorAction visitTerminal(Schema terminal); + + /** + * Invoked for schema with children before proceeding to visit the children. + * @param nonTerminal + * @return + */ + @Nonnull + @CheckReturnValue + SchemaVisitorAction visitNonTerminal(Schema nonTerminal); + + /** + * Invoked for schemas with children after its children have been visited. + * @param nonTerminal + * @return + */ + @Nonnull + @CheckReturnValue + SchemaVisitorAction afterVisitNonTerminal(Schema nonTerminal); + + + /** + * Invoked when visiting is complete. + * @return a value which will be returned by the visit method. + */ + T get(); + +} http://git-wip-us.apache.org/repos/asf/avro/blob/c3384e4d/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/SchemaVisitorAction.java ---------------------------------------------------------------------- diff --git a/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/SchemaVisitorAction.java b/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/SchemaVisitorAction.java new file mode 100644 index 0000000..81157aa --- /dev/null +++ b/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/SchemaVisitorAction.java @@ -0,0 +1,44 @@ + /* + * Copyright (c) 2001 - 2016, Zoltan Farkas All Rights Reserved. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this program; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. + */ + +package org.apache.avro.compiler.schema; + +/** + * @author zoly + */ +public enum SchemaVisitorAction { + + /** + * continue visit. + */ + CONTINUE, + /** + * terminate visit. + */ + TERMINATE, + /** + * when returned from pre non terminal visit method the children of the non terminal are skipped. + * afterVisitNonTerminal for the current schema will not be invoked. + */ + SKIP_SUBTREE, + /** + * Skip visiting the siblings of this schema. + */ + SKIP_SIBLINGS; + +} http://git-wip-us.apache.org/repos/asf/avro/blob/c3384e4d/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/Schemas.java ---------------------------------------------------------------------- diff --git a/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/Schemas.java b/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/Schemas.java new file mode 100644 index 0000000..82cf0bb --- /dev/null +++ b/lang/java/compiler/src/main/java/org/apache/avro/compiler/schema/Schemas.java @@ -0,0 +1,245 @@ +package org.apache.avro.compiler.schema; + +import avro.shaded.com.google.common.base.Function; +import avro.shaded.com.google.common.base.Supplier; +import avro.shaded.com.google.common.collect.Lists; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.Deque; +import java.util.IdentityHashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import javax.annotation.ParametersAreNonnullByDefault; +import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalType; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.apache.avro.compiler.specific.SpecificCompiler; + +/** + * Avro Schema utilities, to traverse... + * + * @author zoly + */ +@ParametersAreNonnullByDefault +public final class Schemas { + + private Schemas() { + } + + public static void copyAliases(final Schema from, final Schema to) { + switch (from.getType()) { // only named types. + case RECORD: + case ENUM: + case FIXED: + Set<String> aliases = from.getAliases(); + for (String alias : aliases) { + to.addAlias(alias); + } + } + } + + public static void copyAliases(final Schema.Field from, final Schema.Field to) { + Set<String> aliases = from.aliases(); + for (String alias : aliases) { + to.addAlias(alias); + } + } + + public static void copyLogicalTypes(final Schema from, final Schema to) { + LogicalType logicalType = from.getLogicalType(); + if (logicalType != null) { + logicalType.addToSchema(to); + } + } + + public static void copyProperties(final JsonProperties from, final JsonProperties to) { + Map<String, Object> objectProps = from.getObjectProps(); + for (Map.Entry<String, Object> entry : objectProps.entrySet()) { + to.addProp(entry.getKey(), entry.getValue()); + } + } + + public static boolean hasGeneratedJavaClass(final Schema schema) { + Schema.Type type = schema.getType(); + switch (type) { + case ENUM: + case RECORD: + case FIXED: + return true; + default: + return false; + } + } + + public static String getJavaClassName(final Schema schema) { + String namespace = schema.getNamespace(); + if (namespace == null || namespace.isEmpty()) { + return SpecificCompiler.mangle(schema.getName()); + } else { + return namespace + '.' + SpecificCompiler.mangle(schema.getName()); + } + } + + /** + * depth first visit. + * + * @param start + * @param visitor + */ + public static <T> T visit(final Schema start, final SchemaVisitor<T> visitor) { + // Set of Visited Schemas + IdentityHashMap<Schema, Schema> visited = new IdentityHashMap<Schema, Schema>(); + // Stack that contains the Schams to process and afterVisitNonTerminal functions. + // Deque<Either<Schema, Supplier<SchemaVisitorAction>>> + // Using either has a cost which we want to avoid... + Deque<Object> dq = new ArrayDeque<Object>(); + dq.addLast(start); + Object current; + while ((current = dq.pollLast()) != null) { + if (current instanceof Supplier) { + // we are executing a non terminal post visit. + SchemaVisitorAction action = ((Supplier<SchemaVisitorAction>) current).get(); + switch (action) { + case CONTINUE: + break; + case SKIP_SUBTREE: + throw new UnsupportedOperationException(); + case SKIP_SIBLINGS: + //CHECKSTYLE:OFF InnerAssignment + while ((current = dq.getLast()) instanceof Schema) { + // just skip + } + //CHECKSTYLE:ON + dq.addLast(current); + break; + case TERMINATE: + return visitor.get(); + default: + throw new UnsupportedOperationException("Invalid action " + action); + } + } else { + Schema schema = (Schema) current; + boolean terminate; + if (!visited.containsKey(schema)) { + Schema.Type type = schema.getType(); + switch (type) { + case ARRAY: + terminate = visitNonTerminal(visitor, schema, dq, Arrays.asList(schema.getElementType())); + visited.put(schema, schema); + break; + case RECORD: + terminate = visitNonTerminal(visitor, schema, dq, + Lists.transform(Lists.reverse(schema.getFields()), new Function<Field, Schema>() { + @Override + public Schema apply(Field f) { + return f.schema(); + } + })); + visited.put(schema, schema); + break; + case UNION: + terminate = visitNonTerminal(visitor, schema, dq, schema.getTypes()); + visited.put(schema, schema); + break; + case MAP: + terminate = visitNonTerminal(visitor, schema, dq, Arrays.asList(schema.getValueType())); + visited.put(schema, schema); + break; + case NULL: + case BOOLEAN: + case BYTES: + case DOUBLE: + case ENUM: + case FIXED: + case FLOAT: + case INT: + case LONG: + case STRING: + terminate = visitTerminal(visitor, schema, dq); + break; + default: + throw new UnsupportedOperationException("Invalid type " + type); + } + + } else { + terminate = visitTerminal(visitor, schema, dq); + } + if (terminate) { + return visitor.get(); + } + } + } + return visitor.get(); + } + + private static boolean visitNonTerminal(final SchemaVisitor visitor, + final Schema schema, final Deque<Object> dq, + final Iterable<Schema> itSupp) { + SchemaVisitorAction action = visitor.visitNonTerminal(schema); + switch (action) { + case CONTINUE: + dq.addLast(new Supplier<SchemaVisitorAction>() { + @Override + public SchemaVisitorAction get() { + return visitor.afterVisitNonTerminal(schema); + } + }); + Iterator<Schema> it = itSupp.iterator(); + while (it.hasNext()) { + Schema child = it.next(); + dq.addLast(child); + } + break; + case SKIP_SUBTREE: + dq.addLast(new Supplier<SchemaVisitorAction>() { + @Override + public SchemaVisitorAction get() { + return visitor.afterVisitNonTerminal(schema); + } + }); + break; + case SKIP_SIBLINGS: + Object current; + //CHECKSTYLE:OFF InnerAssignment + while ((current = dq.getLast()) instanceof Schema) { + // just skip + } + //CHECKSTYLE:ON + dq.addLast(current); + break; + case TERMINATE: + return true; + default: + throw new UnsupportedOperationException("Invalid action " + action + " for " + schema); + } + return false; + } + + private static boolean visitTerminal(final SchemaVisitor visitor, final Schema schema, + final Deque<Object> dq) { + SchemaVisitorAction action = visitor.visitTerminal(schema); + switch (action) { + case CONTINUE: + break; + case SKIP_SUBTREE: + throw new UnsupportedOperationException("Invalid action " + action + " for " + schema); + case SKIP_SIBLINGS: + Object current; + //CHECKSTYLE:OFF InnerAssignment + while ((current = dq.getLast()) instanceof Schema) { + // just skip + } + //CHECKSTYLE:ON + dq.addLast(current); + break; + case TERMINATE: + return true; + default: + throw new UnsupportedOperationException("Invalid action " + action + " for " + schema); + } + return false; + } + +}
