hive generation is looking pretty good

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/e84dcd72
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/e84dcd72
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/e84dcd72

Branch: refs/heads/master
Commit: e84dcd7230f02087db2b6bf7ea6d2148d585e77d
Parents: 7b8ef0a
Author: Steve Blackmon @steveblackmon <sblack...@apache.org>
Authored: Tue May 3 16:32:58 2016 -0500
Committer: Steve Blackmon @steveblackmon <sblack...@apache.org>
Committed: Wed Jun 1 12:49:27 2016 -0500

----------------------------------------------------------------------
 streams-plugins/streams-plugin-hive/pom.xml     |  23 +-
 .../plugins/StreamsHiveResourceGenerator.java   | 221 -------------
 .../StreamsHiveResourceGeneratorMojo.java       |  68 ----
 .../hive/StreamsHiveGenerationConfig.java       |  84 +++++
 .../hive/StreamsHiveResourceGenerator.java      | 322 +++++++++++++++++++
 .../hive/StreamsHiveResourceGeneratorMojo.java  |  76 +++++
 .../test/StreamsHiveResourceGeneratorTest.java  |  88 ++++-
 .../src/test/resources/expected/activity.hql    | 203 ++++++++++++
 .../src/test/resources/expected/collection.hql  |  47 +++
 .../src/test/resources/expected/media_link.hql  |  11 +
 .../src/test/resources/expected/object.hql      |  61 ++++
 .../resources/expected/objectTypes/place.hql    |  79 +++++
 .../test/resources/expected/verbs/purchase.hql  | 203 ++++++++++++
 streams-schemas/pom.xml                         |  37 ++-
 .../org/apache/streams/schema/FieldType.java    |  13 +
 .../org/apache/streams/schema/FieldUtil.java    |  29 ++
 .../org/apache/streams/schema/FileUtil.java     |  69 ++++
 .../apache/streams/schema/GenerationConfig.java | 115 +++++++
 .../java/org/apache/streams/schema/Schema.java  |  57 ++++
 .../org/apache/streams/schema/SchemaStore.java  | 283 ++++++++++++++++
 .../org/apache/streams/schema/SchemaUtil.java   |  50 +++
 .../java/org/apache/streams/schema/URIUtil.java |  31 ++
 22 files changed, 1860 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/pom.xml
----------------------------------------------------------------------
diff --git a/streams-plugins/streams-plugin-hive/pom.xml 
b/streams-plugins/streams-plugin-hive/pom.xml
index 22d75ce..b173a8d 100644
--- a/streams-plugins/streams-plugin-hive/pom.xml
+++ b/streams-plugins/streams-plugin-hive/pom.xml
@@ -40,6 +40,12 @@
             <groupId>org.apache.streams</groupId>
             <artifactId>streams-config</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>commons-logging</artifactId>
+                    <groupId>commons-logging</groupId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.streams</groupId>
@@ -47,15 +53,13 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.jsonschema2pojo</groupId>
-            <artifactId>jsonschema2pojo-core</artifactId>
-        </dependency>
-        <dependency>
             <groupId>org.apache.streams</groupId>
-            <artifactId>streams-pojo</artifactId>
+            <artifactId>streams-schemas</artifactId>
             <version>${project.version}</version>
-            <scope>test</scope>
-            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
         </dependency>
         <dependency>
             <groupId>org.reflections</groupId>
@@ -219,9 +223,10 @@
                             <goal>unpack-dependencies</goal>
                         </goals>
                         <configuration>
-                            
<includeArtifactIds>streams-pojo</includeArtifactIds>
+                            
<includeGroupIds>org.apache.streams</includeGroupIds>
+                            
<includeArtifactIds>streams-schemas</includeArtifactIds>
                             <includes>**/*.json</includes>
-                            
<outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                            
<outputDirectory>${project.build.directory}/test-classes/streams-schemas</outputDirectory>
                         </configuration>
                     </execution>
                 </executions>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java
 
b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java
deleted file mode 100644
index 1efb15e..0000000
--- 
a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGenerator.java
+++ /dev/null
@@ -1,221 +0,0 @@
-package org.apache.streams.plugins;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.reflections.ReflectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.Serializable;
-import java.lang.reflect.Field;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.StandardOpenOption;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Created by sblackmon on 11/18/15.
- */
-public class StreamsHiveResourceGenerator implements Runnable {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsHiveResourceGenerator.class);
-
-    private final static String LS = System.getProperty("line.separator");
-
-    private StreamsHiveResourceGeneratorMojo mojo;
-
-    String inDir = "./target/test-classes/activities";
-    String outDir = "./target/generated-sources/hive";
-
-    public void main(String[] args) {
-        StreamsHiveResourceGenerator streamsHiveResourceGenerator = new 
StreamsHiveResourceGenerator();
-        Thread thread = new Thread(streamsHiveResourceGenerator);
-        thread.start();
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            LOGGER.error("InterruptedException", e);
-        } catch (Exception e) {
-            LOGGER.error("Exception", e);
-        }
-        return;
-    }
-
-    public StreamsHiveResourceGenerator(StreamsHiveResourceGeneratorMojo mojo) 
{
-        this.mojo = mojo;
-        if (    mojo != null &&
-                mojo.getTarget() != null &&
-                !Strings.isNullOrEmpty(mojo.getTarget().getAbsolutePath())
-            )
-            outDir = mojo.getTarget().getAbsolutePath();
-
-        if (    mojo != null &&
-                mojo.getPackages() != null &&
-                mojo.getPackages().length > 0
-            )
-            packages = mojo.getPackages();
-    }
-
-    public StreamsHiveResourceGenerator() {
-    }
-
-    public void run() {
-
-        List<File> schemaFiles;
-
-
-        List<Class<?>> serializableClasses = detectSerializableClasses();
-
-        LOGGER.info("Detected {} serialiables:", serializableClasses.size());
-        for( Class clazz : serializableClasses )
-            LOGGER.debug(clazz.toString());
-
-        List<Class<?>> pojoClasses = detectPojoClasses(serializableClasses);
-
-        LOGGER.info("Detected {} pojos:", pojoClasses.size());
-        for( Class clazz : pojoClasses ) {
-            LOGGER.debug(clazz.toString());
-
-        }
-
-
-        for( Class clazz : pojoClasses ) {
-            String pojoPath = 
clazz.getPackage().getName().replace(".pojo.json", 
".hive").replace(".","/")+"/";
-            String pojoName = clazz.getSimpleName()+".hql";
-            String pojoHive = renderPojo(clazz);
-            writeFile(outDir+"/"+pojoPath+pojoName, pojoHive);
-        }
-
-    }
-
-    private void writeFile(String pojoFile, String pojoHive) {
-        try {
-            File path = new File(pojoFile);
-            File dir = path.getParentFile();
-            if( !dir.exists() )
-                dir.mkdirs();
-            Files.write(Paths.get(pojoFile), pojoHive.getBytes(), 
StandardOpenOption.CREATE_NEW);
-        } catch (Exception e) {
-            LOGGER.error("Write Exception: {}", e);
-        }
-    }
-
-    public List<Class<?>> detectSerializableClasses() {
-
-        Set<Class<? extends Serializable>> classes =
-                reflections.getSubTypesOf(java.io.Serializable.class);
-
-        List<Class<?>> result = Lists.newArrayList();
-
-        for( Class clazz : classes ) {
-            result.add(clazz);
-        }
-
-        return result;
-    }
-
-    public List<Class<?>> detectPojoClasses(List<Class<?>> classes) {
-
-        List<Class<?>> result = Lists.newArrayList();
-
-        for( Class clazz : classes ) {
-            try {
-                clazz.newInstance().toString();
-            } catch( Exception e) {}
-            // super-halfass way to know if this is a jsonschema2pojo
-            if( clazz.getAnnotations().length >= 1 )
-                result.add(clazz);
-        }
-
-        return result;
-    }
-
-    public String renderPojo(Class<?> pojoClass) {
-        StringBuffer stringBuffer = new StringBuffer();
-        stringBuffer.append("CREATE TABLE ");
-        
stringBuffer.append(pojoClass.getPackage().getName().replace(".pojo.json", 
".hive"));
-        stringBuffer.append(LS);
-        stringBuffer.append("(");
-        stringBuffer.append(LS);
-
-        Set<Field> fields = ReflectionUtils.getAllFields(pojoClass);
-        appendFields(stringBuffer, fields, "", ",");
-
-        stringBuffer.append(")");
-
-        return stringBuffer.toString();
-    }
-
-    private void appendFields(StringBuffer stringBuffer, Set<Field> fields, 
String varDef, String fieldDelimiter) {
-        if( fields.size() > 0 ) {
-            stringBuffer.append(LS);
-            Map<String,Field> fieldsToAppend = uniqueFields(fields);
-            for( Iterator<Field> iter = fieldsToAppend.values().iterator(); 
iter.hasNext(); ) {
-                Field field = iter.next();
-                stringBuffer.append(name(field));
-                stringBuffer.append(": ");
-                stringBuffer.append(type(field));
-                if( iter.hasNext()) stringBuffer.append(fieldDelimiter);
-                stringBuffer.append(LS);
-            }
-        } else {
-            stringBuffer.append(LS);
-        }
-    }
-
-    private String value(Field field) {
-        if( field.getName().equals("verb")) {
-            return "\"post\"";
-        } else if( field.getName().equals("objectType")) {
-            return "\"application\"";
-        } else return null;
-    }
-
-    private String type(Field field) {
-        if( field.getType().equals(java.lang.String.class)) {
-            return "STRING";
-        } else if( field.getType().equals(java.lang.Integer.class)) {
-            return "INT";
-        } else if( field.getType().equals(org.joda.time.DateTime.class)) {
-            return "DATE";
-        }else if( field.getType().equals(java.util.Map.class)) {
-            return "MAP";
-        } else if( field.getType().equals(java.util.List.class)) {
-            return "ARRAY";
-        }
-        return field.getType().getCanonicalName().replace(".pojo.json", 
".scala");
-    }
-
-    private Map<String,Field> uniqueFields(Set<Field> fieldset) {
-        Map<String,Field> fields = Maps.newTreeMap();
-        Field item = null;
-        for( Iterator<Field> it = fieldset.iterator(); it.hasNext(); item = 
it.next() ) {
-            if( item != null && item.getName() != null ) {
-                Field added = fields.put(item.getName(), item);
-            }
-            // ensure right class will get used
-        }
-        return fields;
-    }
-
-    private String name(Field field) {
-        if( field.getName().equals("object"))
-            return "obj";
-        else return field.getName();
-    }
-
-    private boolean override(Field field) {
-        try {
-            if( 
field.getDeclaringClass().getSuperclass().getField(field.getName()) != null )
-                return true;
-            else return false;
-        } catch( Exception e ) {
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java
 
b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java
deleted file mode 100644
index 1f8c782..0000000
--- 
a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/StreamsHiveResourceGeneratorMojo.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.streams.plugins;
-
-import org.apache.maven.plugin.AbstractMojo;
-import org.apache.maven.plugin.MojoExecutionException;
-import org.apache.maven.plugins.annotations.Component;
-import org.apache.maven.plugins.annotations.Execute;
-import org.apache.maven.plugins.annotations.LifecyclePhase;
-import org.apache.maven.plugins.annotations.Mojo;
-import org.apache.maven.plugins.annotations.Parameter;
-import org.apache.maven.project.MavenProject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-@Mojo(  name = "hive",
-        defaultPhase = LifecyclePhase.GENERATE_SOURCES
-)
-@Execute(   goal = "hive",
-            phase = LifecyclePhase.GENERATE_SOURCES
-)
-public class StreamsHiveResourceGeneratorMojo extends AbstractMojo {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsHiveResourceGeneratorMojo.class);
-
-    @Component
-    private MavenProject project;
-
-//    @Component
-//    private Settings settings;
-//
-//    @Parameter( defaultValue = "${localRepository}", readonly = true, 
required = true )
-//    protected ArtifactRepository localRepository;
-//
-//    @Parameter( defaultValue = "${plugin}", readonly = true ) // Maven 3 only
-//    private PluginDescriptor plugin;
-//
-    @Parameter( defaultValue = "${project.basedir}", readonly = true )
-    private File basedir;
-
-    @Parameter(defaultValue = "${project.build.directory}", readonly = true)
-    private File target;
-
-    @Parameter(defaultValue = "org.apache.streams.pojo.json", readonly = true)
-    private String[] packages;
-
-    public void execute() throws MojoExecutionException {
-        StreamsHiveResourceGenerator streamsPojoScala = new 
StreamsHiveResourceGenerator(this);
-        Thread thread = new Thread(streamsPojoScala);
-        thread.start();
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            LOGGER.error("InterruptedException", e);
-        } catch (Exception e) {
-            LOGGER.error("Exception", e);
-        }
-        return;
-    }
-
-    public File getTarget() {
-        return target;
-    }
-
-    public String[] getPackages() {
-        return packages;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java
 
b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java
new file mode 100644
index 0000000..7e3bf35
--- /dev/null
+++ 
b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveGenerationConfig.java
@@ -0,0 +1,84 @@
+package org.apache.streams.plugins.hive;
+
+import org.apache.streams.schema.GenerationConfig;
+import org.jsonschema2pojo.DefaultGenerationConfig;
+import org.jsonschema2pojo.util.URLUtil;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Configures StreamsHiveResourceGenerator
+ *
+ *
+ */
+public class StreamsHiveGenerationConfig extends DefaultGenerationConfig 
implements GenerationConfig {
+
+    public String getSourceDirectory() {
+        return sourceDirectory;
+    }
+
+    public List<String> getSourcePaths() {
+        return sourcePaths;
+    }
+
+    private String sourceDirectory;
+    private List<String> sourcePaths = new ArrayList<String>();
+    private String targetDirectory;
+    private int maxDepth = 1;
+
+    public Set<String> getExclusions() {
+        return exclusions;
+    }
+
+    public void setExclusions(Set<String> exclusions) {
+        this.exclusions = exclusions;
+    }
+
+    private Set<String> exclusions = new HashSet<String>();
+
+    public int getMaxDepth() {
+        return maxDepth;
+    }
+
+    public void setSourceDirectory(String sourceDirectory) {
+        this.sourceDirectory = sourceDirectory;
+    }
+
+    public void setSourcePaths(List<String> sourcePaths) {
+        this.sourcePaths = sourcePaths;
+    }
+
+    public void setTargetDirectory(String targetDirectory) {
+        this.targetDirectory = targetDirectory;
+    }
+
+    @Override
+    public File getTargetDirectory() {
+        return new File(targetDirectory);
+    }
+
+    @Override
+    public Iterator<URL> getSource() {
+        if (null != sourceDirectory) {
+            return 
Collections.singleton(URLUtil.parseURL(sourceDirectory)).iterator();
+        }
+        List<URL> sourceURLs = new ArrayList<URL>();
+        if( sourcePaths != null && sourcePaths.size() > 0)
+            for (String source : sourcePaths) {
+                sourceURLs.add(URLUtil.parseURL(source));
+            }
+        return sourceURLs.iterator();
+    }
+
+    public void setMaxDepth(int maxDepth) {
+        this.maxDepth = maxDepth;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java
 
b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java
new file mode 100644
index 0000000..06c1499
--- /dev/null
+++ 
b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGenerator.java
@@ -0,0 +1,322 @@
+package org.apache.streams.plugins.hive;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import org.apache.streams.schema.FieldType;
+import org.apache.streams.schema.FieldUtil;
+import org.apache.streams.schema.FileUtil;
+import org.apache.streams.schema.GenerationConfig;
+import org.apache.streams.schema.Schema;
+import org.apache.streams.schema.SchemaStore;
+import org.apache.streams.schema.SchemaUtil;
+import org.apache.streams.schema.URIUtil;
+import org.jsonschema2pojo.util.URLUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URI;
+import java.net.URL;
+import java.util.*;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.commons.lang3.StringUtils.defaultString;
+import static org.apache.streams.schema.FileUtil.*;
+
+/**
+ * Generates hive table definitions for using 
org.openx.data.jsonserde.JsonSerDe on new-line delimited json documents.
+ *
+ *
+ */
+public class StreamsHiveResourceGenerator implements Runnable {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsHiveResourceGenerator.class);
+
+    private final static String LS = System.getProperty("line.separator");
+
+    private StreamsHiveGenerationConfig config;
+
+    private SchemaStore schemaStore = new SchemaStore();
+
+    private int currentDepth = 0;
+
+    public void main(String[] args) {
+        StreamsHiveGenerationConfig config = new StreamsHiveGenerationConfig();
+
+        String sourceDirectory = "./target/test-classes/activities";
+        String targetDirectory = 
"./target/generated-sources/streams-plugin-hive";
+        String targetPackage = "";
+
+        if( args.length > 0 )
+            sourceDirectory = args[0];
+        if( args.length > 1 )
+            targetDirectory = args[1];
+
+        config.setSourceDirectory(sourceDirectory);
+        config.setTargetDirectory(targetDirectory);
+
+        StreamsHiveResourceGenerator streamsPojoSourceGenerator = new 
StreamsHiveResourceGenerator(config);
+        Thread thread = new Thread(streamsPojoSourceGenerator);
+        thread.start();
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            LOGGER.error("InterruptedException", e);
+        } catch (Exception e) {
+            LOGGER.error("Exception", e);
+        }
+        return;
+    }
+
+    public StreamsHiveResourceGenerator(StreamsHiveGenerationConfig config) {
+        this.config = config;
+    }
+
+    public void run() {
+
+        checkNotNull(config);
+
+        generate(config);
+
+    }
+
+    public void generate(StreamsHiveGenerationConfig config) {
+
+        LinkedList<File> sourceFiles = new LinkedList<File>();
+
+        for (Iterator<URL> sources = config.getSource(); sources.hasNext();) {
+            URL source = sources.next();
+            sourceFiles.add(URLUtil.getFileFromURL(source));
+        }
+
+        LOGGER.info("Seeded with {} source paths:", sourceFiles.size());
+
+        FileUtil.resolveRecursive((GenerationConfig)config, sourceFiles);
+
+        LOGGER.info("Resolved {} schema files:", sourceFiles.size());
+
+        for (Iterator<File> iterator = sourceFiles.iterator(); 
iterator.hasNext();) {
+            File item = iterator.next();
+            schemaStore.create(item.toURI());
+        }
+
+        LOGGER.info("Identified {} objects:", schemaStore.getSize());
+
+        for (Iterator<Schema> schemaIterator = 
schemaStore.getSchemaIterator(); schemaIterator.hasNext(); ) {
+            Schema schema = schemaIterator.next();
+            currentDepth = 0;
+            if( schema.getURI().getScheme().equals("file")) {
+                String inputFile = schema.getURI().getPath();
+                String resourcePath = dropSourcePathPrefix(inputFile, 
config.getSourceDirectory());
+                for (String sourcePath : config.getSourcePaths()) {
+                    resourcePath = dropSourcePathPrefix(resourcePath, 
sourcePath);
+                }
+                String outputFile = config.getTargetDirectory() + "/" + 
swapExtension(resourcePath, "json", "hql");
+
+                LOGGER.info("Processing {}:", resourcePath);
+
+                String resourceId = dropExtension(resourcePath).replace("/", 
"_");
+
+                String resourceContent = generateResource(schema, resourceId);
+
+                writeFile(outputFile, resourceContent);
+
+                LOGGER.info("Wrote {}:", outputFile);
+            }
+        }
+    }
+
+    public String generateResource(Schema schema, String resourceId) {
+        StringBuilder resourceBuilder = new StringBuilder();
+        resourceBuilder.append("CREATE TABLE ");
+        resourceBuilder.append(hqlEscape(resourceId));
+        resourceBuilder.append(LS);
+        resourceBuilder.append("(");
+        resourceBuilder.append(LS);
+        resourceBuilder = appendRootObject(resourceBuilder, schema, 
resourceId, ' ');
+        resourceBuilder.append(")");
+        resourceBuilder.append(LS);
+        resourceBuilder.append("ROW FORMAT SERDE 
'org.openx.data.jsonserde.JsonSerDe'");
+        resourceBuilder.append(LS);
+        resourceBuilder.append("WITH SERDEPROPERTIES 
(\"ignore.malformed.json\" = \"true\"");
+        resourceBuilder.append(LS);
+        resourceBuilder.append("STORED AS TEXTFILE");
+        resourceBuilder.append(LS);
+        resourceBuilder.append("LOCATION '${hiveconf:path}';");
+        resourceBuilder.append(LS);
+        return resourceBuilder.toString();
+    }
+
+    public StringBuilder appendRootObject(StringBuilder builder, Schema 
schema, String resourceId, Character seperator) {
+        ObjectNode propertiesNode = schemaStore.resolveProperties(schema, 
null, resourceId);
+        if( propertiesNode != null && propertiesNode.isObject() && 
propertiesNode.size() > 0) {
+            builder = appendPropertiesNode(builder, schema, propertiesNode, 
seperator);
+        }
+        return builder;
+    }
+
+    private StringBuilder appendValueField(StringBuilder builder, Schema 
schema, String fieldId, FieldType fieldType, Character seperator) {
+        // safe to append nothing
+        checkNotNull(builder);
+        builder.append(hqlEscape(fieldId));
+        builder.append(seperator);
+        builder.append(hqlType(fieldType));
+        return builder;
+    }
+
+    public StringBuilder appendArrayItems(StringBuilder builder, Schema 
schema, String fieldId, ObjectNode itemsNode, Character seperator) {
+        // not safe to append nothing
+        checkNotNull(builder);
+        if( itemsNode == null ) return builder;
+        if( itemsNode.has("type")) {
+            try {
+                FieldType itemType = FieldUtil.determineFieldType(itemsNode);
+                switch( itemType ) {
+                    case OBJECT:
+                        builder = appendArrayObject(builder, schema, fieldId, 
itemsNode, seperator);
+                        break;
+                    case ARRAY:
+                        ObjectNode subArrayItems = (ObjectNode) 
itemsNode.get("items");
+                        builder = appendArrayItems(builder, schema, fieldId, 
subArrayItems, seperator);
+                        break;
+                    default:
+                        builder = appendArrayField(builder, schema, fieldId, 
itemType, seperator);
+                }
+            } catch (Exception e) {
+                LOGGER.warn("No item type resolvable for {}", fieldId);
+            }
+        }
+        checkNotNull(builder);
+        return builder;
+    }
+
+    private StringBuilder appendArrayField(StringBuilder builder, Schema 
schema, String fieldId, FieldType fieldType, Character seperator) {
+        // safe to append nothing
+        checkNotNull(builder);
+        checkNotNull(fieldId);
+        builder.append(hqlEscape(fieldId));
+        builder.append(seperator);
+        builder.append("ARRAY<"+hqlType(fieldType)+">");
+        checkNotNull(builder);
+        return builder;
+    }
+
+    private StringBuilder appendArrayObject(StringBuilder builder, Schema 
schema, String fieldId, ObjectNode fieldNode, Character seperator) {
+        // safe to append nothing
+        checkNotNull(builder);
+        checkNotNull(fieldNode);
+        if( !Strings.isNullOrEmpty(fieldId)) {
+            builder.append(hqlEscape(fieldId));
+            builder.append(seperator);
+        }
+        builder.append("ARRAY");
+        builder.append(LS);
+        builder.append("<");
+        builder.append(LS);
+        ObjectNode propertiesNode = schemaStore.resolveProperties(schema, 
fieldNode, fieldId);
+        builder = appendStructField(builder, schema, "", propertiesNode, ':');
+        builder.append(">");
+        checkNotNull(builder);
+        return builder;
+    }
+
+    private StringBuilder appendStructField(StringBuilder builder, Schema 
schema, String fieldId, ObjectNode propertiesNode, Character seperator) {
+        // safe to append nothing
+        checkNotNull(builder);
+        checkNotNull(propertiesNode);
+
+        if( propertiesNode != null && propertiesNode.isObject() && 
propertiesNode.size() > 0 ) {
+
+            currentDepth += 1;
+
+            if( !Strings.isNullOrEmpty(fieldId)) {
+                builder.append(hqlEscape(fieldId));
+                builder.append(seperator);
+            }
+            builder.append("STRUCT");
+            builder.append(LS);
+            builder.append("<");
+            builder.append(LS);
+
+            builder = appendPropertiesNode(builder, schema, propertiesNode, 
':');
+
+            builder.append(">");
+            builder.append(LS);
+
+            currentDepth -= 1;
+
+        }
+        checkNotNull(builder);
+        return builder;
+    }
+
+    private StringBuilder appendPropertiesNode(StringBuilder builder, Schema 
schema, ObjectNode propertiesNode, Character seperator) {
+        checkNotNull(builder);
+        checkNotNull(propertiesNode);
+        Iterator<Map.Entry<String, JsonNode>> fields = propertiesNode.fields();
+        Joiner joiner = Joiner.on(","+LS).skipNulls();
+        List<String> fieldStrings = Lists.newArrayList();
+        for( ; fields.hasNext(); ) {
+            Map.Entry<String, JsonNode> field = fields.next();
+            String fieldId = field.getKey();
+            if( !config.getExclusions().contains(fieldId) && 
field.getValue().isObject()) {
+                ObjectNode fieldNode = (ObjectNode) field.getValue();
+                FieldType fieldType = FieldUtil.determineFieldType(fieldNode);
+                if (fieldType != null ) {
+                    switch (fieldType) {
+                        case ARRAY:
+                            ObjectNode itemsNode = (ObjectNode) 
fieldNode.get("items");
+                            if( currentDepth <= config.getMaxDepth()) {
+                                StringBuilder arrayItemsBuilder = 
appendArrayItems(new StringBuilder(), schema, fieldId, itemsNode, seperator);
+                                if( 
!Strings.isNullOrEmpty(arrayItemsBuilder.toString())) {
+                                    
fieldStrings.add(arrayItemsBuilder.toString());
+                                }
+                            }
+                            break;
+                        case OBJECT:
+                            ObjectNode childProperties = 
schemaStore.resolveProperties(schema, fieldNode, fieldId);
+                            if( currentDepth < config.getMaxDepth()) {
+                                StringBuilder structFieldBuilder = 
appendStructField(new StringBuilder(), schema, fieldId, childProperties, 
seperator);
+                                if( 
!Strings.isNullOrEmpty(structFieldBuilder.toString())) {
+                                    
fieldStrings.add(structFieldBuilder.toString());
+                                }
+                            }
+                            break;
+                        default:
+                            StringBuilder valueFieldBuilder = 
appendValueField(new StringBuilder(), schema, fieldId, fieldType, seperator);
+                            if( 
!Strings.isNullOrEmpty(valueFieldBuilder.toString())) {
+                                fieldStrings.add(valueFieldBuilder.toString());
+                            }
+                    }
+                }
+            }
+        }
+        builder.append(joiner.join(fieldStrings)).append(LS);
+        Preconditions.checkNotNull(builder);
+        return builder;
+    }
+
+    private static String hqlEscape( String fieldId ) {
+        return "`"+fieldId+"`";
+    }
+
+    private static String hqlType( FieldType fieldType ) {
+        switch( fieldType ) {
+            case INTEGER:
+                return "INT";
+            case NUMBER:
+                return "FLOAT";
+            case OBJECT:
+                return "STRUCT";
+            default:
+                return fieldType.name().toUpperCase();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java
 
b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java
new file mode 100644
index 0000000..9cf71a9
--- /dev/null
+++ 
b/streams-plugins/streams-plugin-hive/src/main/java/org/apache/streams/plugins/hive/StreamsHiveResourceGeneratorMojo.java
@@ -0,0 +1,76 @@
+package org.apache.streams.plugins.hive;
+
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+import org.apache.maven.plugins.annotations.Component;
+import org.apache.maven.plugins.annotations.Execute;
+import org.apache.maven.plugins.annotations.LifecyclePhase;
+import org.apache.maven.plugins.annotations.Mojo;
+import org.apache.maven.plugins.annotations.Parameter;
+import org.apache.maven.project.MavenProject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.List;
+
+@Mojo(  name = "hive",
+        defaultPhase = LifecyclePhase.GENERATE_SOURCES
+)
+@Execute(   goal = "hive",
+            phase = LifecyclePhase.GENERATE_SOURCES
+)
+public class StreamsHiveResourceGeneratorMojo extends AbstractMojo {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(StreamsHiveResourceGeneratorMojo.class);
+
+    @Component
+    private MavenProject project;
+
+//    @Component
+//    private Settings settings;
+//
+//    @Parameter( defaultValue = "${localRepository}", readonly = true, 
required = true )
+//    protected ArtifactRepository localRepository;
+//
+//    @Parameter( defaultValue = "${plugin}", readonly = true ) // Maven 3 only
+//    private PluginDescriptor plugin;
+//
+    @Parameter( defaultValue = "${project.basedir}", readonly = true )
+    private File basedir;
+
+    @Parameter( defaultValue = "./src/main/jsonschema", readonly = true ) // 
Maven 3 only
+    public String sourceDirectory;
+
+    @Parameter( readonly = true ) // Maven 3 only
+    public List<String> sourcePaths;
+
+    @Parameter(defaultValue = 
"./target/generated-sources/streams-plugin-hive", readonly = true)
+    public String targetDirectory;
+
+    public void execute() throws MojoExecutionException {
+
+        //addProjectDependenciesToClasspath();
+
+        StreamsHiveGenerationConfig config = new StreamsHiveGenerationConfig();
+
+        if( sourcePaths != null && sourcePaths.size() > 0)
+            config.setSourcePaths(sourcePaths);
+        else
+            config.setSourceDirectory(sourceDirectory);
+        config.setTargetDirectory(targetDirectory);
+
+        StreamsHiveResourceGenerator streamsPojoScala = new 
StreamsHiveResourceGenerator(config);
+        Thread thread = new Thread(streamsPojoScala);
+        thread.start();
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            LOGGER.error("InterruptedException", e);
+        } catch (Exception e) {
+            LOGGER.error("Exception", e);
+        }
+        return;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java
 
b/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java
index cd80cf8..9be908d 100644
--- 
a/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java
+++ 
b/streams-plugins/streams-plugin-hive/src/test/java/org/apache/streams/plugins/test/StreamsHiveResourceGeneratorTest.java
@@ -1,12 +1,23 @@
 package org.apache.streams.plugins.test;
 
-import org.apache.streams.plugins.StreamsHiveResourceGenerator;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
+import org.apache.streams.plugins.hive.StreamsHiveGenerationConfig;
+import org.apache.streams.plugins.hive.StreamsHiveResourceGenerator;
+import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
 import java.io.File;
-import java.io.FileFilter;
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.apache.streams.schema.FileUtil.dropSourcePathPrefix;
 
 /**
  * Test that Activity beans are compatible with the example activities in the 
spec.
@@ -21,24 +32,79 @@ public class StreamsHiveResourceGeneratorTest {
      * @throws Exception
      */
     @Test
-    public void testStreamsPojoHive() throws Exception {
-        StreamsHiveResourceGenerator streamsHiveResourceGenerator = new 
StreamsHiveResourceGenerator();
-        streamsHiveResourceGenerator.main(new String[0]);
+    public void StreamsHiveResourceGenerator() throws Exception {
+
+        StreamsHiveGenerationConfig config = new StreamsHiveGenerationConfig();
+
+        String sourceDirectory = "target/test-classes/streams-schemas";
+
+        config.setSourceDirectory(sourceDirectory);
+
+        config.setTargetDirectory("target/generated-sources/test");
+
+        config.setExclusions(Sets.newHashSet("attachments"));
+
+        config.setMaxDepth(2);
+
+        StreamsHiveResourceGenerator streamsHiveResourceGenerator = new 
StreamsHiveResourceGenerator(config);
+        Thread thread = new Thread(streamsHiveResourceGenerator);
+        thread.start();
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            LOGGER.error("InterruptedException", e);
+        } catch (Exception e) {
+            LOGGER.error("Exception", e);
+        }
 
-        File testOutput = new File( 
"./target/generated-sources/hive/org/apache/streams/hive");
-        FileFilter hqlFilter = new FileFilter() {
+        File testOutput = new File( "./target/generated-sources/test");
+        Predicate<File> hqlFilter = new Predicate<File>() {
             @Override
-            public boolean accept(File pathname) {
-                if( pathname.getName().endsWith(".hql") )
+            public boolean apply(@Nullable File file) {
+                if( file.getName().endsWith(".hql") )
                     return true;
-                return false;
+                else return false;
             }
         };
 
         assert( testOutput != null );
         assert( testOutput.exists() == true );
         assert( testOutput.isDirectory() == true );
-        assert( testOutput.listFiles(hqlFilter).length == 11 );
+
+        Iterable<File> outputIterator = 
Files.fileTreeTraverser().breadthFirstTraversal(testOutput)
+                .filter(hqlFilter);
+        Collection<File> outputCollection = Lists.newArrayList(outputIterator);
+        assert( outputCollection.size() == 133 );
+
+        String expectedDirectory = "target/test-classes/expected";
+        File testExpected = new File( expectedDirectory );
+
+        Iterable<File> expectedIterator = 
Files.fileTreeTraverser().breadthFirstTraversal(testExpected)
+                .filter(hqlFilter);
+        Collection<File> expectedCollection = 
Lists.newArrayList(expectedIterator);
+
+        int fails = 0;
+
+        Iterator<File> iterator = expectedCollection.iterator();
+        while( iterator.hasNext() ) {
+            File objectExpected = iterator.next();
+            String expectedEnd = 
dropSourcePathPrefix(objectExpected.getAbsolutePath(),  expectedDirectory);
+            File objectActual = new File(config.getTargetDirectory() + "/" + 
expectedEnd);
+            LOGGER.info("Comparing: {} and {}", 
objectExpected.getAbsolutePath(), objectActual.getAbsolutePath());
+            assert( objectActual.exists());
+            if( FileUtils.contentEquals(objectActual, objectExpected) == true 
) {
+                LOGGER.info("Exact Match!");
+            } else {
+                LOGGER.info("No Match!");
+                fails++;
+            }
+        }
+        if( fails > 0 ) {
+            LOGGER.info("Fails: {}", fails);
+            Assert.fail();
+        }
+
+
 //        assert( new File(testOutput + "/traits").exists() == true );
 //        assert( new File(testOutput + "/traits").isDirectory() == true );
 //        assert( new File(testOutput + "/traits").listFiles(scalaFilter) != 
null );

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql
new file mode 100644
index 0000000..49d0f41
--- /dev/null
+++ 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/activity.hql
@@ -0,0 +1,203 @@
+CREATE TABLE `activity`
+(
+`id` STRING,
+`actor` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`verb` STRING,
+`object` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`target` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published` STRING,
+`updated` STRING,
+`generator` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`icon` STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`provider` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`title` STRING,
+`content` STRING,
+`url` STRING,
+`links` ARRAY<STRING>
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql
 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql
new file mode 100644
index 0000000..94b986f
--- /dev/null
+++ 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/collection.hql
@@ -0,0 +1,47 @@
+CREATE TABLE `collection`
+(
+`url` STRING,
+`totalItems` INT,
+`items` ARRAY
+<
+STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+>
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql
 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql
new file mode 100644
index 0000000..d0afe0f
--- /dev/null
+++ 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/media_link.hql
@@ -0,0 +1,11 @@
+CREATE TABLE `media_link`
+(
+`duration` FLOAT,
+`height` INT,
+`width` INT,
+`url` STRING
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql
new file mode 100644
index 0000000..fb44767
--- /dev/null
+++ b/streams-plugins/streams-plugin-hive/src/test/resources/expected/object.hql
@@ -0,0 +1,61 @@
+CREATE TABLE `object`
+(
+`id` STRING,
+`image` STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName` STRING,
+`summary` STRING,
+`content` STRING,
+`url` STRING,
+`objectType` STRING,
+`author` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published` STRING,
+`updated` STRING,
+`upstreamDuplicates` ARRAY<STRING>,
+`downstreamDuplicates` ARRAY<STRING>
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql
 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql
new file mode 100644
index 0000000..b861fad
--- /dev/null
+++ 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/objectTypes/place.hql
@@ -0,0 +1,79 @@
+CREATE TABLE `objectTypes_place`
+(
+`id` STRING,
+`image` STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName` STRING,
+`summary` STRING,
+`content` STRING,
+`url` STRING,
+`objectType` STRING,
+`author` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published` STRING,
+`updated` STRING,
+`upstreamDuplicates` ARRAY<STRING>,
+`downstreamDuplicates` ARRAY<STRING>,
+`address` STRUCT
+<
+`post-office-box`:STRING,
+`extended-address`:STRING,
+`street-address`:STRING,
+`locality`:STRING,
+`region`:STRING,
+`postal-code`:STRING,
+`country-name`:STRING
+>
+,
+`position` STRUCT
+<
+`altitude`:FLOAT,
+`latitude`:FLOAT,
+`longitude`:FLOAT
+>
+
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql
----------------------------------------------------------------------
diff --git 
a/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql
 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql
new file mode 100644
index 0000000..4a05269
--- /dev/null
+++ 
b/streams-plugins/streams-plugin-hive/src/test/resources/expected/verbs/purchase.hql
@@ -0,0 +1,203 @@
+CREATE TABLE `verbs_purchase`
+(
+`id` STRING,
+`actor` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`verb` STRING,
+`object` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`target` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published` STRING,
+`updated` STRING,
+`generator` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`icon` STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`provider` STRUCT
+<
+`id`:STRING,
+`image`:STRUCT
+<
+`duration`:FLOAT,
+`height`:INT,
+`width`:INT,
+`url`:STRING
+>
+,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`author`:STRUCT
+<
+`id`:STRING,
+`displayName`:STRING,
+`summary`:STRING,
+`content`:STRING,
+`url`:STRING,
+`objectType`:STRING,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`published`:STRING,
+`updated`:STRING,
+`upstreamDuplicates`:ARRAY<STRING>,
+`downstreamDuplicates`:ARRAY<STRING>
+>
+,
+`title` STRING,
+`content` STRING,
+`url` STRING,
+`links` ARRAY<STRING>
+)
+ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
+WITH SERDEPROPERTIES ("ignore.malformed.json" = "true"
+STORED AS TEXTFILE
+LOCATION '${hiveconf:path}';

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-schemas/pom.xml
----------------------------------------------------------------------
diff --git a/streams-schemas/pom.xml b/streams-schemas/pom.xml
index d64359a..0625cb6 100644
--- a/streams-schemas/pom.xml
+++ b/streams-schemas/pom.xml
@@ -29,9 +29,44 @@
     <artifactId>streams-schemas</artifactId>
     <name>${project.artifactId}</name>
 
-    <description>Activity Streams schemas</description>
+    <description>Activity Streams schemas and schema utilities</description>
 
+    <dependencies>
+        <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.jsonschema2pojo</groupId>
+            <artifactId>jsonschema2pojo-core</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+        </dependency>
+
+    </dependencies>
     <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
         <resources>
             <resource>
                 <directory>src/main/resources</directory>

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java
----------------------------------------------------------------------
diff --git 
a/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java 
b/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java
new file mode 100644
index 0000000..1ad9dcc
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/FieldType.java
@@ -0,0 +1,13 @@
+package org.apache.streams.schema;
+
+/**
+ * Created by steve on 5/1/16.
+ */
+public enum FieldType {
+    STRING,
+    INTEGER,
+    NUMBER,
+    BOOLEAN,
+    OBJECT,
+    ARRAY
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java 
b/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java
new file mode 100644
index 0000000..5f83767
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/FieldUtil.java
@@ -0,0 +1,29 @@
+package org.apache.streams.schema;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+/**
+ * Created by steve on 5/1/16.
+ */
+public class FieldUtil {
+    public static FieldType determineFieldType(ObjectNode fieldNode) {
+        String typeSchemaField = "type";
+        if( !fieldNode.has(typeSchemaField))
+            return null;
+        String typeSchemaFieldValue = fieldNode.get(typeSchemaField).asText();
+        if( typeSchemaFieldValue.equals("string")) {
+            return FieldType.STRING;
+        } else if( typeSchemaFieldValue.equals("integer")) {
+            return FieldType.INTEGER;
+        } else if( typeSchemaFieldValue.equals("number")) {
+            return FieldType.NUMBER;
+        } else if( typeSchemaFieldValue.equals("object")) {
+            return FieldType.OBJECT;
+        } else if( typeSchemaFieldValue.equals("boolean")) {
+            return FieldType.BOOLEAN;
+        } else if( typeSchemaFieldValue.equals("array")) {
+            return FieldType.ARRAY;
+        }
+        else return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java 
b/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java
new file mode 100644
index 0000000..6b171f3
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/FileUtil.java
@@ -0,0 +1,69 @@
+package org.apache.streams.schema;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Created by steve on 5/1/16.
+ */
+public class FileUtil {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(FileUtil.class);
+
+    public static String dropSourcePathPrefix(String inputFile, String 
sourceDirectory) {
+        if(Strings.isNullOrEmpty(sourceDirectory))
+            return inputFile;
+        else if( inputFile.contains(sourceDirectory) ) {
+            return 
inputFile.substring(inputFile.indexOf(sourceDirectory)+sourceDirectory.length()+1);
+        } else return inputFile;
+    }
+
+    public static String swapExtension(String inputFile, String 
originalExtension, String newExtension) {
+        if(inputFile.endsWith("."+originalExtension))
+            return inputFile.replace("."+originalExtension, "."+newExtension);
+        else return inputFile;
+    }
+
+    public static String dropExtension(String inputFile) {
+        if(inputFile.contains("."))
+            return inputFile.substring(0, inputFile.lastIndexOf("."));
+        else return inputFile;
+    }
+
+    public static void writeFile(String resourceFile, String resourceContent) {
+        try {
+            File path = new File(resourceFile);
+            File dir = path.getParentFile();
+            if( !dir.exists() )
+                dir.mkdirs();
+            Files.write(Paths.get(resourceFile), resourceContent.getBytes(), 
StandardOpenOption.CREATE_NEW);
+        } catch (Exception e) {
+            LOGGER.error("Write Exception: {}", e);
+        }
+    }
+
+    public static void resolveRecursive(GenerationConfig config, List<File> 
schemaFiles) {
+
+        Preconditions.checkArgument(schemaFiles.size() > 0);
+        int i = 0;
+        while( schemaFiles.size() > i) {
+            File child = schemaFiles.get(i);
+            if (child.isDirectory()) {
+                
schemaFiles.addAll(Arrays.asList(child.listFiles(config.getFileFilter())));
+                schemaFiles.remove(child);
+            } else {
+                i += 1;
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java
----------------------------------------------------------------------
diff --git 
a/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java 
b/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java
new file mode 100644
index 0000000..e0469c9
--- /dev/null
+++ 
b/streams-schemas/src/main/java/org/apache/streams/schema/GenerationConfig.java
@@ -0,0 +1,115 @@
+package org.apache.streams.schema;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.net.URL;
+import java.util.Iterator;
+
+/**
+ * Created by sblackmon on 5/3/16.
+ */
+public interface GenerationConfig {
+
+    /**
+     * Gets the 'source' configuration option.
+     *
+     * @return The source file(s) or directory(ies) from which JSON Schema will
+     *         be read.
+     */
+    Iterator<URL> getSource();
+
+    /**
+     * Gets the 'targetDirectory' configuration option.
+     *
+     * @return The target directory into which generated types will be written
+     *         (may or may not exist before types are written)
+     */
+    File getTargetDirectory();
+
+    /**
+     * Gets the 'outputEncoding' configuration option.
+     *
+     * @return The character encoding that should be used when writing output 
files.
+     */
+    String getOutputEncoding();
+
+    /**
+     * Gets the file filter used to isolate the schema mapping files in the
+     * source directories.
+     *
+     * @return the file filter use when scanning for schema files.
+     */
+    FileFilter getFileFilter();
+
+    /**
+     * Gets the 'includeAdditionalProperties' configuration option.
+     *
+     * @return Whether to allow 'additional properties' support in objects.
+     *         Setting this to false will disable additional properties 
support,
+     *         regardless of the input schema(s).
+     */
+    boolean isIncludeAdditionalProperties();
+
+    /**
+     * Gets the 'targetVersion' configuration option.
+     *
+     *  @return The target version for generated source files.
+     */
+    String getTargetVersion();
+
+//    /**
+//     * Gets the `includeDynamicAccessors` configuraiton option.
+//     *
+//     * @return Whether to include dynamic getters, setters, and builders
+//     *         or to omit these methods.
+//     */
+//    boolean isIncludeDynamicAccessors();
+
+//    /**
+//     * Gets the `dateTimeType` configuration option.
+//     *         <p>
+//     *         Example values:
+//     *         <ul>
+//     *         <li><code>org.joda.time.LocalDateTime</code> (Joda)</li>
+//     *         <li><code>java.time.LocalDateTime</code> (JSR310)</li>
+//     *         <li><code>null</code> (default behavior)</li>
+//     *         </ul>
+//     *
+//     * @return The java type to use instead of {@link java.util.Date}
+//     *         when adding date type fields to generate Java types.
+//     */
+//    String getDateTimeType();
+//
+//    /**
+//     * Gets the `dateType` configuration option.
+//     *         <p>
+//     *         Example values:
+//     *         <ul>
+//     *         <li><code>org.joda.time.LocalDate</code> (Joda)</li>
+//     *         <li><code>java.time.LocalDate</code> (JSR310)</li>
+//     *         <li><code>null</code> (default behavior)</li>
+//     *         </ul>
+//     *
+//     * @return The java type to use instead of string
+//     *         when adding string type fields with a format of date (not
+//     *         date-time) to generated Java types.
+//     */
+//    String getDateType();
+//
+//    /**
+//     * Gets the `timeType` configuration option.
+//     *         <p>
+//     *         Example values:
+//     *         <ul>
+//     *         <li><code>org.joda.time.LocalTime</code> (Joda)</li>
+//     *         <li><code>java.time.LocalTime</code> (JSR310)</li>
+//     *         <li><code>null</code> (default behavior)</li>
+//     *         </ul>
+//     *
+//     * @return The java type to use instead of string
+//     *         when adding string type fields with a format of time (not
+//     *         date-time) to generated Java types.
+//     */
+//    String getTimeType();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java
----------------------------------------------------------------------
diff --git 
a/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java 
b/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java
new file mode 100644
index 0000000..ea75ffd
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/Schema.java
@@ -0,0 +1,57 @@
+package org.apache.streams.schema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+import java.net.URI;
+
+/**
+ * A JSON Schema document.
+ */
+public class Schema {
+
+    private final URI id;
+    private final URI uri;
+    private final JsonNode content;
+    private final Schema parent;
+    private final boolean generate;
+
+    public Schema(URI uri, JsonNode content, Schema parent, boolean generate) {
+        this.uri = uri;
+        this.content = content;
+        this.parent = parent;
+        this.generate = generate;
+        this.id = content.has("id") ? URI.create(content.get("id").asText()) : 
null;
+    }
+
+    public URI getId() {
+        return id;
+    }
+
+    public URI getURI() {
+        return uri;
+    }
+
+    public JsonNode getContent() {
+        return content;
+    }
+
+    public JsonNode getParentContent() {
+        if( parent != null )
+            return parent.getContent();
+        else return null;
+    }
+
+    public URI getParentURI() {
+        if( parent != null ) return parent.getURI();
+        else return null;
+    }
+
+    public boolean isGenerated() {
+        return generate;
+    }
+
+    public Schema getParent() {
+        return parent;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java
----------------------------------------------------------------------
diff --git 
a/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java 
b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java
new file mode 100644
index 0000000..e612aff
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaStore.java
@@ -0,0 +1,283 @@
+package org.apache.streams.schema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Optional;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Ordering;
+import org.apache.commons.lang3.StringUtils;
+import org.jsonschema2pojo.ContentResolver;
+import org.jsonschema2pojo.FragmentResolver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.streams.schema.URIUtil.safeResolve;
+
+/**
+ * Created by steve on 4/30/16.
+ */
+public class SchemaStore extends Ordering<Schema> {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(SchemaStore.class);
+    private final static JsonNodeFactory NODE_FACTORY = 
JsonNodeFactory.instance;
+
+    protected Map<URI, Schema> schemas = new HashMap();
+    protected FragmentResolver fragmentResolver = new FragmentResolver();
+    protected ContentResolver contentResolver = new ContentResolver();
+
+    public SchemaStore() {
+    }
+
+    public synchronized Schema create(URI uri) {
+        if(!getByUri(uri).isPresent()) {
+            URI baseURI = URIUtil.removeFragment(uri);
+            JsonNode baseNode = this.contentResolver.resolve(baseURI);
+            if(uri.toString().contains("#") && !uri.toString().endsWith("#")) {
+                Schema newSchema = new Schema(baseURI, baseNode, null, true);
+                this.schemas.put(baseURI, newSchema);
+                JsonNode childContent = 
this.fragmentResolver.resolve(baseNode, '#' + 
StringUtils.substringAfter(uri.toString(), "#"));
+                this.schemas.put(uri, new Schema(uri, childContent, newSchema, 
false));
+            } else {
+                if( baseNode.has("extends") && 
baseNode.get("extends").isObject()) {
+                    URI ref = 
URI.create(((ObjectNode)baseNode.get("extends")).get("$ref").asText());
+                    URI absoluteURI;
+                    if( ref.isAbsolute())
+                        absoluteURI = ref;
+                    else
+                        absoluteURI = baseURI.resolve(ref);
+                    JsonNode parentNode = 
this.contentResolver.resolve(absoluteURI);
+                    Schema parentSchema = null;
+                    if( this.schemas.get(absoluteURI) != null ) {
+                        parentSchema = this.schemas.get(absoluteURI);
+                    } else {
+                        parentSchema = create(absoluteURI);
+                    }
+                    this.schemas.put(uri, new Schema(uri, baseNode, 
parentSchema, true));
+                } else {
+                    this.schemas.put(uri, new Schema(uri, baseNode, null, 
true));
+                }
+            }
+            List<JsonNode> refs = baseNode.findValues("$ref");
+            for( JsonNode ref : refs ) {
+                if( ref.isValueNode() ) {
+                    String refVal = ref.asText();
+                    URI refURI = null;
+                    try {
+                        refURI = URI.create(refVal);
+                    } catch( Exception e ) {
+                        LOGGER.info("Exception: {}", e.getMessage());
+                    }
+                    if (refURI != null && !getByUri(refURI).isPresent()) {
+                        if (refURI.isAbsolute())
+                            create(refURI);
+                        else
+                            create(baseURI.resolve(refURI));
+                    }
+                }
+            }
+        }
+
+        return this.schemas.get(uri);
+    }
+
+    public Schema create(Schema parent, String path) {
+        if(path.equals("#")) {
+            return parent;
+        } else {
+            path = StringUtils.stripEnd(path, "#?&/");
+            URI id = parent != null && parent.getId() != 
null?parent.getId().resolve(path):URI.create(path);
+            if(this.selfReferenceWithoutParentFile(parent, path)) {
+                this.schemas.put(id, new Schema(id, 
this.fragmentResolver.resolve(parent.getParentContent(), path), parent, false));
+                return this.schemas.get(id);
+            } else {
+                return this.create(id);
+            }
+        }
+    }
+
+    protected boolean selfReferenceWithoutParentFile(Schema parent, String 
path) {
+        return parent != null && (parent.getId() == null || 
parent.getId().toString().startsWith("#/")) && path.startsWith("#/");
+    }
+
+    public synchronized void clearCache() {
+        this.schemas.clear();
+    }
+
+    public Integer getSize() {
+        return schemas.size();
+    }
+
+    public Optional<Schema> getById(URI id) {
+        for( Schema schema : schemas.values() ) {
+            if( schema.getId() != null && schema.getId().equals(id) )
+                return Optional.of(schema);
+        }
+        return Optional.absent();
+    }
+
+    public Optional<Schema> getByUri(URI uri) {
+        for( Schema schema : schemas.values() ) {
+            if( schema.getURI().equals(uri) )
+                return Optional.of(schema);
+        }
+        return Optional.absent();
+    }
+
+    public Integer getFileUriCount() {
+        int count = 0;
+        for( Schema schema : schemas.values() ) {
+            if( schema.getURI().getScheme().equals("file") )
+                count++;
+        }
+        return count;
+    }
+
+    public Integer getHttpUriCount() {
+        int count = 0;
+        for( Schema schema : schemas.values() ) {
+            if( schema.getURI().getScheme().equals("http") )
+                count++;
+        }
+        return count;
+    }
+
+    public Iterator<Schema> getSchemaIterator() {
+        List<Schema> schemaList = Lists.newArrayList(schemas.values());
+        Collections.sort(schemaList, this);
+        return schemaList.iterator();
+    }
+
+    public ObjectNode resolveProperties(Schema schema, ObjectNode fieldNode, 
String resourceId) {
+        // this should return something more suitable like:
+        //   Map<String, Pair<Schema, ObjectNode>>
+        ObjectNode schemaProperties = NODE_FACTORY.objectNode();
+        ObjectNode parentProperties = NODE_FACTORY.objectNode();
+        if (fieldNode == null) {
+            ObjectNode schemaContent = (ObjectNode) schema.getContent();
+            if( schemaContent.has("properties") ) {
+                schemaProperties = (ObjectNode) 
schemaContent.get("properties");
+                if (schema.getParentContent() != null) {
+                    ObjectNode parentContent = (ObjectNode) 
schema.getParentContent();
+                    if (parentContent.has("properties")) {
+                        parentProperties = (ObjectNode) 
parentContent.get("properties");
+                    }
+                }
+            }
+        } else if (fieldNode != null && fieldNode.size() > 0) {
+            if( fieldNode.has("properties") && 
fieldNode.get("properties").isObject() && fieldNode.get("properties").size() > 
0 )
+                schemaProperties = (ObjectNode) fieldNode.get("properties");
+            URI parentURI = null;
+            if( fieldNode.has("$ref") || fieldNode.has("extends") ) {
+                JsonNode refNode = fieldNode.get("$ref");
+                JsonNode extendsNode = fieldNode.get("extends");
+                if (refNode != null && refNode.isValueNode())
+                    parentURI = URI.create(refNode.asText());
+                else if (extendsNode != null && extendsNode.isObject())
+                    parentURI = URI.create(extendsNode.get("$ref").asText());
+                ObjectNode parentContent = null;
+                URI absoluteURI;
+                if (parentURI.isAbsolute())
+                    absoluteURI = parentURI;
+                else {
+                    absoluteURI = schema.getURI().resolve(parentURI);
+                    if (!absoluteURI.isAbsolute() || (absoluteURI.isAbsolute() 
&& !getByUri(absoluteURI).isPresent() ))
+                        absoluteURI = schema.getParentURI().resolve(parentURI);
+                }
+                if (absoluteURI != null && absoluteURI.isAbsolute()) {
+                    if (getByUri(absoluteURI).isPresent())
+                        parentContent = (ObjectNode) 
getByUri(absoluteURI).get().getContent();
+                    if (parentContent != null && parentContent.isObject() && 
parentContent.has("properties")) {
+                        parentProperties = (ObjectNode) 
parentContent.get("properties");
+                    } else if (absoluteURI.getPath().endsWith("#properties")) {
+                        absoluteURI = 
URI.create(absoluteURI.toString().replace("#properties", ""));
+                        parentProperties = (ObjectNode) 
getByUri(absoluteURI).get().getContent().get("properties");
+                    }
+                }
+            }
+
+
+        }
+
+        ObjectNode resolvedProperties = NODE_FACTORY.objectNode();
+        if (parentProperties != null && parentProperties.size() > 0)
+            resolvedProperties = SchemaUtil.mergeProperties(schemaProperties, 
parentProperties);
+        else resolvedProperties = schemaProperties.deepCopy();
+
+        return resolvedProperties;
+    }
+
+    @Override
+    public int compare(Schema left, Schema right) {
+        // are they the same?
+        if( left.equals(right)) return 0;
+        // is one an ancestor of the other
+        Schema candidateAncestor = left;
+        while( candidateAncestor.getParent() != null ) {
+            candidateAncestor = candidateAncestor.getParent();
+            if( candidateAncestor.equals(right))
+                return 1;
+        }
+        candidateAncestor = right;
+        while( candidateAncestor.getParent() != null ) {
+            candidateAncestor = candidateAncestor.getParent();
+            if( candidateAncestor.equals(left))
+                return -1;
+        }
+        // does one have a field that reference the other?
+        for( JsonNode refNode : left.getContent().findValues("$ref") ) {
+            String refText = refNode.asText();
+            Optional<URI> resolvedURI = safeResolve(left.getURI(), refText);
+            if( resolvedURI.isPresent() && 
resolvedURI.get().equals(right.getURI()))
+                return 1;
+        }
+        for( JsonNode refNode : right.getContent().findValues("$ref") ) {
+            String refText = refNode.asText();
+            Optional<URI> resolvedURI = safeResolve(right.getURI(), refText);
+            if( resolvedURI.isPresent() && 
resolvedURI.get().equals(left.getURI()))
+                return -1;
+        }
+        // does one have a field that reference a third schema that references 
the other?
+        for( JsonNode refNode : left.getContent().findValues("$ref") ) {
+            String refText = refNode.asText();
+            Optional<URI> possibleConnectorURI = safeResolve(left.getURI(), 
refText);
+            if( possibleConnectorURI.isPresent()) {
+                Optional<Schema> possibleConnector = 
getByUri(possibleConnectorURI.get());
+                if (possibleConnector.isPresent()) {
+                    for (JsonNode connectorRefNode : 
possibleConnector.get().getContent().findValues("$ref")) {
+                        String connectorRefText = connectorRefNode.asText();
+                        Optional<URI> resolvedURI = 
safeResolve(possibleConnector.get().getURI(), connectorRefText);
+                        if (resolvedURI.isPresent() && 
resolvedURI.get().equals(right.getURI()))
+                            return 1;
+                    }
+                }
+            }
+        }
+        for( JsonNode refNode : right.getContent().findValues("$ref") ) {
+            String refText = refNode.asText();
+            Optional<URI> possibleConnectorURI = safeResolve(right.getURI(), 
refText);
+            if( possibleConnectorURI.isPresent()) {
+                Optional<Schema> possibleConnector = 
getByUri(possibleConnectorURI.get());
+                if (possibleConnector.isPresent()) {
+                    for (JsonNode connectorRefNode : 
possibleConnector.get().getContent().findValues("$ref")) {
+                        String connectorRefText = connectorRefNode.asText();
+                        Optional<URI> resolvedURI = 
safeResolve(possibleConnector.get().getURI(), connectorRefText);
+                        if (resolvedURI.isPresent() && 
resolvedURI.get().equals(left.getURI()))
+                            return -1;
+                    }
+                }
+            }
+        }
+        return 0;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java 
b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java
new file mode 100644
index 0000000..3e3b300
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/SchemaUtil.java
@@ -0,0 +1,50 @@
+package org.apache.streams.schema;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.jsonschema2pojo.util.NameHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URL;
+import java.util.Iterator;
+import java.util.Map;
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+/**
+ * Created by steve on 4/30/16.
+ */
+public class SchemaUtil {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(SchemaUtil.class);
+    private static final JsonNodeFactory NODE_FACTORY = 
JsonNodeFactory.instance;
+
+    public static String childQualifiedName(String parentQualifiedName, String 
childSimpleName) {
+        String safeChildName = 
childSimpleName.replaceAll(NameHelper.ILLEGAL_CHARACTER_REGEX, "_");
+        return isEmpty(parentQualifiedName) ? safeChildName : 
parentQualifiedName + "." + safeChildName;
+    }
+
+    public static ObjectNode readSchema(URL schemaUrl) {
+
+        ObjectNode schemaNode = NODE_FACTORY.objectNode();
+        schemaNode.put("$ref", schemaUrl.toString());
+        return schemaNode;
+
+    }
+
+    public static ObjectNode mergeProperties(ObjectNode content, ObjectNode 
parent) {
+
+        ObjectNode merged = parent.deepCopy();
+        Iterator<Map.Entry<String, JsonNode>> fields = content.fields();
+        for( ; fields.hasNext(); ) {
+            Map.Entry<String, JsonNode> field = fields.next();
+            String fieldId = field.getKey();
+            merged.put(fieldId, field.getValue().deepCopy());
+        }
+        return merged;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/e84dcd72/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java 
b/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java
new file mode 100644
index 0000000..04e8904
--- /dev/null
+++ b/streams-schemas/src/main/java/org/apache/streams/schema/URIUtil.java
@@ -0,0 +1,31 @@
+package org.apache.streams.schema;
+
+import com.google.common.base.Optional;
+import org.apache.commons.lang3.StringUtils;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+/**
+ * Created by sblackmon on 5/1/16.
+ */
+public class URIUtil {
+
+    public static URI removeFragment(URI id) {
+        return URI.create(StringUtils.substringBefore(id.toString(), "#"));
+    }
+
+    public static URI removeFile(URI id) {
+        return URI.create(StringUtils.substringBeforeLast(id.toString(), "/"));
+    }
+
+    public static Optional<URI> safeResolve(URI absolute, String relativePart) 
{
+        if( !absolute.isAbsolute()) return Optional.absent();
+        try {
+            return Optional.of(absolute.resolve(relativePart));
+        } catch( IllegalArgumentException e ) {
+            return Optional.absent();
+        }
+    }
+
+}

Reply via email to