HAWQ-178. Add JSON plugin support in code base.

Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/fd9c3686
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/fd9c3686
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/fd9c3686

Branch: refs/heads/master
Commit: fd9c36861506ac94255a34c6a85307bf87ae0f72
Parents: c0d7c4f
Author: Oleksandr Diachenko <[email protected]>
Authored: Wed May 18 16:58:07 2016 -0700
Committer: Oleksandr Diachenko <[email protected]>
Committed: Wed May 18 16:58:07 2016 -0700

----------------------------------------------------------------------
 pxf/build.gradle                                |  34 +-
 pxf/pxf-json/.gitignore                         |   1 +
 .../pxf/plugins/json/ColumnDescriptorCache.java | 119 ++++
 .../hawq/pxf/plugins/json/JsonAccessor.java     |  84 +++
 .../hawq/pxf/plugins/json/JsonRecordReader.java | 176 +++++
 .../hawq/pxf/plugins/json/JsonResolver.java     | 256 +++++++
 .../hawq/pxf/plugins/json/parser/JsonLexer.java | 175 +++++
 .../json/parser/PartitionedJsonParser.java      | 200 ++++++
 .../pxf/plugins/json/JsonExtensionTest.java     | 272 ++++++++
 .../apache/hawq/pxf/plugins/json/PxfUnit.java   | 666 +++++++++++++++++++
 .../pxf/plugins/json/parser/JsonLexerTest.java  | 141 ++++
 .../parser/PartitionedJsonParserNoSeekTest.java |  83 +++
 .../parser/PartitionedJsonParserOffsetTest.java |  55 ++
 .../parser/PartitionedJsonParserSeekTest.java   | 113 ++++
 .../src/test/resources/datatypes-test.json      |  14 +
 .../lexer-tests/array_objects_complex.json      |  24 +
 .../array_objects_complex.json.state            | 149 +++++
 .../lexer-tests/array_objects_empty.json        |   1 +
 .../lexer-tests/array_objects_empty.json.state  |   7 +
 .../resources/lexer-tests/array_of_numbers.json |   1 +
 .../lexer-tests/array_of_numbers.json.state     |   5 +
 .../resources/lexer-tests/object_complex.json   |  22 +
 .../lexer-tests/object_complex.json.state       | 146 ++++
 .../resources/lexer-tests/object_simple.json    |   4 +
 .../lexer-tests/object_simple.json.state        |  24 +
 .../lexer-tests/object_string_escaping.json     |   4 +
 .../object_string_escaping.json.state           |  44 ++
 .../src/test/resources/log4j.properties         |  22 +
 .../src/test/resources/null-tweets.json         |   2 +
 .../noseek/array_objects_complex.json           |  24 +
 .../array_objects_complex.json.expected1.json   |  22 +
 .../noseek/array_objects_complex2.json          |  24 +
 .../array_objects_complex2.json.expected1.json  |   4 +
 .../array_objects_complex2.json.expected2.json  |   4 +
 .../array_objects_same_name_in_child.json       |  24 +
 ...jects_same_name_in_child.json.expected1.json |  22 +
 .../parser-tests/noseek/object_simple.json      |   4 +
 .../noseek/object_simple.json.expected1.json    |   4 +
 .../noseek/object_string_escaping.json          |   4 +
 .../object_string_escaping.json.expected1.json  |   4 +
 .../child-object-before-member/expected.1.json  |  44 ++
 .../seek/child-object-before-member/input.json  |  59 ++
 .../child-object-before-member2/expected.1.json |  44 ++
 .../seek/child-object-before-member2/input.json |  59 ++
 .../parser-tests/seek/multi/expected.1.json     |  22 +
 .../parser-tests/seek/multi/expected.2.json     |  25 +
 .../parser-tests/seek/multi/input.json          |  48 ++
 .../parser-tests/seek/no-elements/input.json    |  24 +
 .../seek/seek-into-mid-object-1/expected.1.json |  22 +
 .../seek/seek-into-mid-object-1/input.json      |  28 +
 .../seek/seek-into-mid-object-2/expected.1.json |  22 +
 .../seek/seek-into-mid-object-2/input.json      |  28 +
 .../seek/seek-into-mid-object-3/expected.1.json |  22 +
 .../seek/seek-into-mid-object-3/input.json      |  28 +
 .../seek/seek-into-string-1/expected.1.json     |  22 +
 .../seek/seek-into-string-1/input.json          |  27 +
 .../seek/seek-into-string-2/expected.1.json     |  22 +
 .../seek/seek-into-string-2/input.json          |  27 +
 .../parser-tests/seek/simple/expected.1.json    |  22 +
 .../parser-tests/seek/simple/input.json         |  24 +
 .../src/test/resources/sample-malformed.json    |   3 +
 pxf/pxf-json/src/test/resources/sample.json     |   3 +
 .../src/test/resources/tweets-broken.json       |  67 ++
 .../test/resources/tweets-pp-with-delete.json   |  79 +++
 pxf/pxf-json/src/test/resources/tweets-pp.json  |  67 ++
 .../resources/tweets-small-with-delete.json     |   4 +
 .../src/test/resources/tweets-small.json        |   4 +
 .../tweets-with-missing-text-attribtute.json    |  18 +
 pxf/pxf-json/src/test/resources/tweets.tar.gz   | Bin 0 -> 796 bytes
 .../test/resources/variable-size-objects.json   |   3 +
 .../src/main/resources/pxf-privatehdp.classpath |   1 +
 .../src/main/resources/pxf-privatephd.classpath |   1 +
 .../src/main/resources/pxf-profiles-default.xml |  14 +
 pxf/settings.gradle                             |   1 +
 74 files changed, 3861 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/build.gradle
----------------------------------------------------------------------
diff --git a/pxf/build.gradle b/pxf/build.gradle
index 3f3d31c..eb716fc 100644
--- a/pxf/build.gradle
+++ b/pxf/build.gradle
@@ -33,7 +33,7 @@ buildscript {
             url 'http://repository.jboss.org/nexus/content/groups/public'
         }
     }
-    
+
     dependencies {
         classpath "com.netflix.nebula:gradle-ospackage-plugin:2.2.6"
         classpath "de.undercouch:gradle-download-task:2.1.0"
@@ -87,7 +87,7 @@ subprojects { subProject ->
         testCompile 'org.powermock:powermock-api-mockito:1.5.1'
         testCompile 'org.mockito:mockito-core:1.9.5'
     }
-    
+
     configurations.all {
         resolutionStrategy {
             // force versions that were specified in dependencies:
@@ -337,6 +337,26 @@ project('pxf-hive') {
     }
 }
 
+project('pxf-json') {
+    dependencies {
+      compile(project(':pxf-hdfs'))
+      compile(project(':pxf-service'))
+      compile "org.apache.commons:commons-lang3:3.0"
+
+      testCompile 'pl.pragmatists:JUnitParams:1.0.2'
+    }
+
+    ospackage {
+      requires('pxf-hdfs', project.version, GREATER | EQUAL)
+
+      from(jar.outputs.files) {
+        into "/usr/lib/pxf-${project.version}"
+      }
+
+      link("/usr/lib/pxf-${project.version}/${project.name}.jar", 
"${project.name}-${project.version}.jar")
+    }
+}
+
 project('pxf-hbase') {
     dependencies {
         compile(project(':pxf-api'))
@@ -404,15 +424,15 @@ task rpm(type: Copy, dependsOn: [subprojects.build, 
distSubprojects.buildRpm]) {
 def tomcatName = "apache-tomcat-${tomcatVersion}"
 def tomcatTargetDir = "tomcat/build/"
 
-    
+
 task tomcatGet << {
-    
+
     apply plugin: 'de.undercouch.download'
     
     def TarGzSuffix = ".tar.gz"
     def tomcatTar = "${tomcatName}${TarGzSuffix}"
     def tomcatUrl = 
"http://archive.apache.org/dist/tomcat/tomcat-7/v${tomcatVersion}/bin/${tomcatTar}";
-           
+
     if (file("${tomcatTargetDir}/${tomcatName}").exists()) {
         println "${tomcatName} already exists, nothing to do"
         return 0
@@ -436,13 +456,13 @@ apply plugin: 'os-package'
 
 task tomcatRpm(type: Rpm) {
     buildDir = 'tomcat/build/'
-     
+
     // clean should not delete the downloaded tarball
     // and RPM, so this is a bogus directory to delete instead.
     clean {
         delete = 'tomcat/build/something'
     }
-     
+
     ospackage {
         packageName 'apache-tomcat'
         summary = 'Apache Tomcat RPM'

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/.gitignore
----------------------------------------------------------------------
diff --git a/pxf/pxf-json/.gitignore b/pxf/pxf-json/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/pxf/pxf-json/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java
 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java
new file mode 100644
index 0000000..01cd37c
--- /dev/null
+++ 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java
@@ -0,0 +1,119 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+
+/**
+ * Helper class used to retrieve all column details relevant for the json 
processing.
+ */
+public class ColumnDescriptorCache {
+
+       private static Pattern ARRAY_PROJECTION_PATTERN = 
Pattern.compile("(.+)\\[([0-9]+)\\]");
+       private static int ARRAY_NAME_GROUPID = 1;
+       private static int ARRAY_INDEX_GROUPID = 2;
+
+       private final DataType columnType;
+       private final String[] normalizedProjection;
+       private final String arrayNodeName;
+       private final int arrayNodeIndex;
+       private final boolean isArray;
+       private String columnName;
+
+       public ColumnDescriptorCache(ColumnDescriptor columnDescriptor) {
+
+               // HAWQ column type
+               this.columnType = 
DataType.get(columnDescriptor.columnTypeCode());
+
+               this.columnName = columnDescriptor.columnName();
+
+               // Column name can use dot-name convention to specify a nested 
json node.
+               // Break the path into array of path steps called projections
+               String[] projection = 
columnDescriptor.columnName().split("\\.");
+
+               // When the projection contains array reference (e.g. 
projections = foo.bar[66]) then replace the last path
+               // element by the array name (e.g. normalizedProjection = 
foo.bar)
+               normalizedProjection = new String[projection.length];
+
+               // Check if the provided json path (projections) refers to an 
array element.
+               Matcher matcher = 
ARRAY_PROJECTION_PATTERN.matcher(projection[projection.length - 1]);
+               if (matcher.matches()) {
+                       this.isArray = true;
+                       // extracts the array node name from the projection path
+                       this.arrayNodeName = matcher.group(ARRAY_NAME_GROUPID);
+                       // extracts the array index from the projection path
+                       this.arrayNodeIndex = 
Integer.parseInt(matcher.group(ARRAY_INDEX_GROUPID));
+
+                       System.arraycopy(projection, 0, normalizedProjection, 
0, projection.length - 1);
+                       normalizedProjection[projection.length - 1] = 
this.arrayNodeName;
+               } else {
+                       this.isArray = false;
+                       this.arrayNodeName = null;
+                       this.arrayNodeIndex = -1;
+
+                       System.arraycopy(projection, 0, normalizedProjection, 
0, projection.length);
+               }
+       }
+
+       /**
+        * @return Column's type
+        */
+       public DataType getColumnType() {
+               return columnType;
+       }
+
+       /**
+        * @return Returns the column name as defined in the HAWQ table.
+        */
+       public String getColumnName() {
+               return columnName;
+       }
+
+       /**
+        * If the column name contains dots (.) then this name is interpreted 
as path into the target json document pointing
+        * to nested json member. The leftmost path element stands for the root 
in the json document.
+        * 
+        * @return If the column name contains dots (.) list of field names 
that represent the path from the root json node
+        *         to the target nested node.
+        */
+       public String[] getNormalizedProjections() {
+               return normalizedProjection;
+       }
+
+       /**
+        * The 'jsonName[index]' column name conventions is used to point to a 
particular json array element.
+        * 
+        * @return Returns the json index of the referred array element.
+        */
+       public int getArrayNodeIndex() {
+               return arrayNodeIndex;
+       }
+
+       /**
+        * @return Returns true if the column name is a path to json array 
element and false otherwise.
+        */
+       public boolean isArray() {
+               return isArray;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java
new file mode 100644
index 0000000..8006273
--- /dev/null
+++ 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java
@@ -0,0 +1,84 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor;
+
+/**
+ * This JSON accessor for PXF will read JSON data and pass it to a {@link 
JsonResolver}.
+ * 
+ * This accessor supports a single JSON record per line, or a multi-line JSON 
records if the <b>IDENTIFIER</b> parameter
+ * is set.
+ * 
+ * When provided the <b>IDENTIFIER</b> indicates the member name used to 
determine the encapsulating json object to
+ * return.
+ */
+public class JsonAccessor extends HdfsSplittableDataAccessor {
+
+       public static final String IDENTIFIER_PARAM = "IDENTIFIER";
+       public static final String RECORD_MAX_LENGTH_PARAM = "MAXLENGTH";
+
+       /**
+        * If provided indicates the member name which will be used to 
determine the encapsulating json object to return.
+        */
+       private String identifier = "";
+
+       /**
+        * Optional parameter that allows to define the max length of a json 
record. Records that exceed the allowed length
+        * are skipped. This parameter is applied only for the multi-line json 
records (e.g. when the IDENTIFIER is
+        * provided).
+        */
+       private int maxRecordLength = Integer.MAX_VALUE;
+
+       public JsonAccessor(InputData inputData) throws Exception {
+               // Because HdfsSplittableDataAccessor doesn't use the 
InputFormat we set it to null.
+               super(inputData, null);
+
+               if (!isEmpty(inputData.getUserProperty(IDENTIFIER_PARAM))) {
+
+                       identifier = 
inputData.getUserProperty(IDENTIFIER_PARAM);
+
+                       // If the member identifier is set then check if a 
record max length is defined as well.
+                       if 
(!isEmpty(inputData.getUserProperty(RECORD_MAX_LENGTH_PARAM))) {
+                               maxRecordLength = 
Integer.valueOf(inputData.getUserProperty(RECORD_MAX_LENGTH_PARAM));
+                       }
+               }
+       }
+
+       @Override
+       protected Object getReader(JobConf conf, InputSplit split) throws 
IOException {
+               if (!isEmpty(identifier)) {
+                       conf.set(JsonRecordReader.RECORD_MEMBER_IDENTIFIER, 
identifier);
+                       conf.setInt(JsonRecordReader.RECORD_MAX_LENGTH, 
maxRecordLength);
+                       return new JsonRecordReader(conf, (FileSplit) split);
+               } else {
+                       return new LineRecordReader(conf, (FileSplit) split);
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
new file mode 100644
index 0000000..26d4c82
--- /dev/null
+++ 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
@@ -0,0 +1,176 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hawq.pxf.plugins.json.parser.PartitionedJsonParser;
+
+/**
+ * Multi-line json object reader. JsonRecordReader uses a member name (set by 
the <b>IDENTIFIER</b> PXF parameter) to
+ * determine the encapsulating object to extract and read.
+ * 
+ * JsonRecordReader supports compressed input files as well.
+ * 
+ * As a safe guard set the optional <b>MAXLENGTH</b> parameter to limit the 
max size of a record.
+ */
+public class JsonRecordReader implements RecordReader<LongWritable, Text> {
+
+       private static final Log LOG = 
LogFactory.getLog(JsonRecordReader.class);
+
+       public static final String RECORD_MEMBER_IDENTIFIER = 
"json.input.format.record.identifier";
+       public static final String RECORD_MAX_LENGTH = 
"multilinejsonrecordreader.maxlength";
+
+       private CompressionCodecFactory compressionCodecs = null;
+       private long start;
+       private long pos;
+       private long end;
+       private int maxObjectLength;
+       private InputStream is;
+       private PartitionedJsonParser parser;
+       private final String jsonMemberName;
+
+       /**
+        * Create new multi-line json object reader.
+        * 
+        * @param conf
+        *            Hadoop context
+        * @param split
+        *            HDFS split to start the reading from
+        * @throws IOException
+        */
+       public JsonRecordReader(JobConf conf, FileSplit split) throws 
IOException {
+
+               this.jsonMemberName = conf.get(RECORD_MEMBER_IDENTIFIER);
+               this.maxObjectLength = conf.getInt(RECORD_MAX_LENGTH, 
Integer.MAX_VALUE);
+
+               start = split.getStart();
+               end = start + split.getLength();
+               final Path file = split.getPath();
+               compressionCodecs = new CompressionCodecFactory(conf);
+               final CompressionCodec codec = compressionCodecs.getCodec(file);
+
+               // open the file and seek to the start of the split
+               FileSystem fs = file.getFileSystem(conf);
+               FSDataInputStream fileIn = fs.open(split.getPath());
+               if (codec != null) {
+                       is = codec.createInputStream(fileIn);
+                       start = 0;
+                       end = Long.MAX_VALUE;
+               } else {
+                       if (start != 0) {
+                               fileIn.seek(start);
+                       }
+                       is = fileIn;
+               }
+               parser = new PartitionedJsonParser(is);
+               this.pos = start;
+       }
+
+       /*
+        * {@inheritDoc}
+        */
+       @Override
+       public boolean next(LongWritable key, Text value) throws IOException {
+
+               while (pos < end) {
+
+                       String json = 
parser.nextObjectContainingMember(jsonMemberName);
+                       pos = start + parser.getBytesRead();
+
+                       if (json == null) {
+                               return false;
+                       }
+
+                       long jsonStart = pos - json.length();
+
+                       // if the "begin-object" position is after the end of 
our split, we should ignore it
+                       if (jsonStart >= end) {
+                               return false;
+                       }
+
+                       if (json.length() > maxObjectLength) {
+                               LOG.warn("Skipped JSON object of size " + 
json.length() + " at pos " + jsonStart);
+                       } else {
+                               key.set(jsonStart);
+                               value.set(json);
+                               return true;
+                       }
+               }
+
+               return false;
+       }
+
+       /*
+        * {@inheritDoc}
+        */
+       @Override
+       public LongWritable createKey() {
+               return new LongWritable();
+       }
+
+       /*
+        * {@inheritDoc}
+        */
+       @Override
+       public Text createValue() {
+               return new Text();
+       }
+
+       @Override
+       public long getPos() throws IOException {
+               return pos;
+       }
+
+       /*
+        * {@inheritDoc}
+        */
+       @Override
+       public synchronized void close() throws IOException {
+               if (is != null) {
+                       is.close();
+               }
+       }
+
+       /*
+        * {@inheritDoc}
+        */
+       @Override
+       public float getProgress() throws IOException {
+               if (start == end) {
+                       return 0.0f;
+               } else {
+                       return Math.min(1.0f, (pos - start) / (float) (end - 
start));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java
new file mode 100644
index 0000000..21db6b7
--- /dev/null
+++ 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java
@@ -0,0 +1,256 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.ColumnDescriptor;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.api.utilities.Plugin;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.map.ObjectMapper;
+
+/**
+ * This JSON resolver for PXF will decode a given object from the {@link 
JsonAccessor} into a row for HAWQ. It will
+ * decode this data into a JsonNode and walk the tree for each column. It 
supports normal value mapping via projections
+ * and JSON array indexing.
+ */
+public class JsonResolver extends Plugin implements ReadResolver {
+
+       private static final Log LOG = LogFactory.getLog(JsonResolver.class);
+
+       private ArrayList<OneField> oneFieldList;
+       private ColumnDescriptorCache[] columnDescriptorCache;
+       private ObjectMapper mapper;
+
+       /**
+        * Row with empty fields. Returned in case of broken or malformed json 
records.
+        */
+       private final List<OneField> emptyRow;
+
+       public JsonResolver(InputData inputData) throws Exception {
+               super(inputData);
+               oneFieldList = new ArrayList<OneField>();
+               mapper = new ObjectMapper(new JsonFactory());
+
+               // Precompute the column metadata. The metadata is used for 
mapping column names to json nodes.
+               columnDescriptorCache = new 
ColumnDescriptorCache[inputData.getColumns()];
+               for (int i = 0; i < inputData.getColumns(); ++i) {
+                       ColumnDescriptor cd = inputData.getColumn(i);
+                       columnDescriptorCache[i] = new 
ColumnDescriptorCache(cd);
+               }
+
+               emptyRow = createEmptyRow();
+       }
+
+       @Override
+       public List<OneField> getFields(OneRow row) throws Exception {
+               oneFieldList.clear();
+
+               String jsonRecordAsText = row.getData().toString();
+
+               JsonNode root = decodeLineToJsonNode(jsonRecordAsText);
+
+               if (root == null) {
+                       LOG.warn("Return empty-fields row due to invalid JSON: 
" + jsonRecordAsText);
+                       return emptyRow;
+               }
+
+               // Iterate through the column definition and fetch our JSON data
+               for (ColumnDescriptorCache columnMetadata : 
columnDescriptorCache) {
+
+                       JsonNode node = getChildJsonNode(root, 
columnMetadata.getNormalizedProjections());
+
+                       // If this node is null or missing, add a null value 
here
+                       if (node == null || node.isMissingNode()) {
+                               addNullField(columnMetadata.getColumnType());
+                       } else if (columnMetadata.isArray()) {
+                               // If this column is an array index, ex. 
"tweet.hashtags[0]"
+                               if (node.isArray()) {
+                                       // If the JSON node is an array, then 
add it to our list
+                                       
addFieldFromJsonArray(columnMetadata.getColumnType(), node, 
columnMetadata.getArrayNodeIndex());
+                               } else {
+                                       throw new 
IllegalStateException(columnMetadata.getColumnName() + " is not an array node");
+                               }
+                       } else {
+                               // This column is not an array type
+                               // Add the value to the record
+                               
addFieldFromJsonNode(columnMetadata.getColumnType(), node);
+                       }
+               }
+
+               return oneFieldList;
+       }
+
+       /**
+        * @return Returns a row comprised of typed, empty fields. Used as a 
result of broken/malformed json records.
+        */
+       private List<OneField> createEmptyRow() {
+               ArrayList<OneField> emptyFieldList = new ArrayList<OneField>();
+               for (ColumnDescriptorCache column : columnDescriptorCache) {
+                       emptyFieldList.add(new 
OneField(column.getColumnType().getOID(), null));
+               }
+               return emptyFieldList;
+       }
+
+       /**
+        * Iterates down the root node to the child JSON node defined by the 
projs path.
+        * 
+        * @param root
+        *            node to to start the traversal from.
+        * @param projs
+        *            defines the path from the root to the desired child node.
+        * @return Returns the child node defined by the root and projs path.
+        */
+       private JsonNode getChildJsonNode(JsonNode root, String[] projs) {
+
+               // Iterate through all the tokens to the desired JSON node
+               JsonNode node = root;
+               for (int j = 0; j < projs.length; ++j) {
+                       node = node.path(projs[j]);
+               }
+
+               return node;
+       }
+
+       /**
+        * Iterates through the given JSON node to the proper index and adds 
the field of corresponding type
+        * 
+        * @param type
+        *            The {@link DataType} type
+        * @param node
+        *            The JSON array node
+        * @param index
+        *            The array index to iterate to
+        * @throws IOException
+        */
+       private void addFieldFromJsonArray(DataType type, JsonNode node, int 
index) throws IOException {
+
+               int count = 0;
+               boolean added = false;
+               for (Iterator<JsonNode> arrayNodes = node.getElements(); 
arrayNodes.hasNext();) {
+                       JsonNode arrayNode = arrayNodes.next();
+
+                       if (count == index) {
+                               added = true;
+                               addFieldFromJsonNode(type, arrayNode);
+                               break;
+                       }
+
+                       ++count;
+               }
+
+               // if we reached the end of the array without adding a field, 
add null
+               if (!added) {
+                       addNullField(type);
+               }
+       }
+
+       /**
+        * Adds a field from a given JSON node value based on the {@link 
DataType} type.
+        * 
+        * @param type
+        *            The DataType type
+        * @param val
+        *            The JSON node to extract the value.
+        * @throws IOException
+        */
+       private void addFieldFromJsonNode(DataType type, JsonNode val) throws 
IOException {
+               OneField oneField = new OneField();
+               oneField.type = type.getOID();
+
+               if (val.isNull()) {
+                       oneField.val = null;
+               } else {
+                       switch (type) {
+                       case BIGINT:
+                               oneField.val = val.asLong();
+                               break;
+                       case BOOLEAN:
+                               oneField.val = val.asBoolean();
+                               break;
+                       case CHAR:
+                               oneField.val = val.asText().charAt(0);
+                               break;
+                       case BYTEA:
+                               oneField.val = val.asText().getBytes();
+                               break;
+                       case FLOAT8:
+                               oneField.val = val.asDouble();
+                               break;
+                       case REAL:
+                               oneField.val = (float)val.asDouble();
+                               break;
+                       case INTEGER:
+                               oneField.val = val.asInt();
+                               break;
+                       case SMALLINT:
+                               oneField.val = (short)val.asInt();
+                               break;
+                       case BPCHAR:
+                       case TEXT:
+                       case VARCHAR:
+                               oneField.val = val.asText();
+                               break;
+                       default:
+                               throw new IOException("Unsupported type " + 
type);
+                       }
+               }
+
+               oneFieldList.add(oneField);
+       }
+
+       /**
+        * Adds a null field of the given type.
+        * 
+        * @param type
+        *            The {@link DataType} type
+        */
+       private void addNullField(DataType type) {
+               oneFieldList.add(new OneField(type.getOID(), null));
+       }
+
+       /**
+        * Converts the input line parameter into {@link JsonNode} instance.
+        * 
+        * @param line
+        *            JSON text
+        * @return Returns a {@link JsonNode} that represents the input line or 
null for invalid json.
+        */
+       private JsonNode decodeLineToJsonNode(String line) {
+
+               try {
+                       return mapper.readTree(line);
+               } catch (Exception e) {
+                       LOG.error("Failed to parse JSON object", e);
+                       return null;
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java
 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java
new file mode 100644
index 0000000..616312d
--- /dev/null
+++ 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java
@@ -0,0 +1,175 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A very loosey-goosey lexer that doesn't enforce any JSON structural rules
+ */
+public class JsonLexer {
+
+       /**
+        * The current Lexer state.
+        */
+       private JsonLexerState state;
+
+       /**
+        * Create a new lexer with {@link JsonLexerState#NULL} initial state
+        */
+       public JsonLexer() {
+               this(JsonLexerState.NULL);
+       }
+
+       /**
+        * Create a new lexer with initial state
+        * 
+        * @param initState
+        *            Lexer initial state
+        */
+       public JsonLexer(JsonLexerState initState) {
+               state = initState;
+       }
+
+       /**
+        * @return current lexer state
+        */
+       public JsonLexerState getState() {
+               return state;
+       }
+
+       /**
+        * Change Lexer's {@link #state}
+        * 
+        * @param state
+        *            New lexer state
+        */
+       public void setState(JsonLexerState state) {
+               this.state = state;
+       }
+
+       /**
+        * Represents the possible states of a cursor can take in a JSON 
document.
+        */
+       public static enum JsonLexerState {
+               NULL,
+
+               DONT_CARE,
+
+               BEGIN_OBJECT,
+
+               END_OBJECT,
+
+               BEGIN_STRING,
+
+               END_STRING,
+
+               INSIDE_STRING,
+
+               STRING_ESCAPE,
+
+               VALUE_SEPARATOR,
+
+               NAME_SEPARATOR,
+
+               BEGIN_ARRAY,
+
+               END_ARRAY,
+
+               WHITESPACE
+       }
+
+       /**
+        * Given the current lexer state and the next cursor position computes 
the next {@link #state}.
+        * 
+        * @param c
+        *            next character the cursor is moved to
+        */
+       public void lex(char c) {
+               switch (state) {
+               case NULL:
+               case BEGIN_OBJECT:
+               case END_OBJECT:
+               case BEGIN_ARRAY:
+               case END_ARRAY:
+               case END_STRING:
+               case VALUE_SEPARATOR:
+               case NAME_SEPARATOR:
+               case DONT_CARE:
+               case WHITESPACE: {
+                       if (Character.isWhitespace(c)) {
+                               state = JsonLexerState.WHITESPACE;
+                               break;
+                       }
+                       switch (c) {
+                       // value-separator (comma)
+                       case ',':
+                               state = JsonLexerState.VALUE_SEPARATOR;
+                               break;
+                       // name-separator (colon)
+                       case ':':
+                               state = JsonLexerState.NAME_SEPARATOR;
+                               break;
+                       // string
+                       case '"':
+                               state = JsonLexerState.BEGIN_STRING;
+                               break;
+                       // start-object
+                       case '{':
+                               state = JsonLexerState.BEGIN_OBJECT;
+                               break;
+                       // end-object
+                       case '}':
+                               state = JsonLexerState.END_OBJECT;
+                               break;
+                       // begin-array
+                       case '[':
+                               state = JsonLexerState.BEGIN_ARRAY;
+                               break;
+                       // end-array
+                       case ']':
+                               state = JsonLexerState.END_ARRAY;
+                               break;
+                       default:
+                               state = JsonLexerState.DONT_CARE;
+                       }
+                       break;
+               }
+               case BEGIN_STRING:
+               case INSIDE_STRING: {
+                       state = JsonLexerState.INSIDE_STRING;
+                       // we will now enter the STRING state below
+
+                       switch (c) {
+                       // end-string
+                       case '"':
+                               state = JsonLexerState.END_STRING;
+                               break;
+                       // escape
+                       case '\\':
+                               state = JsonLexerState.STRING_ESCAPE;
+                       }
+                       break;
+               }
+               case STRING_ESCAPE: {
+                       state = JsonLexerState.INSIDE_STRING;
+                       break;
+               }
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java
 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java
new file mode 100644
index 0000000..71ad449
--- /dev/null
+++ 
b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java
@@ -0,0 +1,200 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+
+import org.apache.hawq.pxf.plugins.json.parser.JsonLexer.JsonLexerState;
+
+/**
+ * A simple parser that can support reading JSON objects from a random point 
in JSON text. It reads from the supplied
+ * stream (which is assumed to be positioned at any arbitrary position inside 
some JSON text) until it finds the first
+ * JSON begin-object "{". From this point on it will keep reading JSON objects 
until it finds one containing a member
+ * string that the user supplies.
+ * <p/>
+ * It is not recommended to use this with JSON text where individual JSON 
objects that can be large (MB's or larger).
+ */
+public class PartitionedJsonParser {
+
+       private static final char BACKSLASH = '\\';
+       private static final char START_BRACE = '{';
+       private static final int EOF = -1;
+       private final InputStreamReader inputStreamReader;
+       private final JsonLexer lexer;
+       private long bytesRead = 0;
+       private boolean endOfStream = false;
+
+       public PartitionedJsonParser(InputStream is) {
+               this.lexer = new JsonLexer();
+
+               // You need to wrap the InputStream with an InputStreamReader, 
so that it can encode the incoming byte stream as
+               // UTF-8 characters
+               this.inputStreamReader = new InputStreamReader(is, 
StandardCharsets.UTF_8);
+       }
+
+       private boolean scanToFirstBeginObject() throws IOException {
+               // seek until we hit the first begin-object
+               char prev = ' ';
+               int i;
+               while ((i = inputStreamReader.read()) != EOF) {
+                       char c = (char) i;
+                       bytesRead++;
+                       if (c == START_BRACE && prev != BACKSLASH) {
+                               
lexer.setState(JsonLexer.JsonLexerState.BEGIN_OBJECT);
+                               return true;
+                       }
+                       prev = c;
+               }
+               endOfStream = true;
+               return false;
+       }
+
+       private enum MemberSearchState {
+               FOUND_STRING_NAME,
+
+               SEARCHING,
+
+               IN_MATCHING_OBJECT
+       }
+
+       private static final EnumSet<JsonLexerState> inStringStates = 
EnumSet.of(JsonLexerState.INSIDE_STRING,
+                       JsonLexerState.STRING_ESCAPE);
+
+       /**
+        * @param memberName
+        *            Indicates the member name used to determine the 
encapsulating object to return.
+        * @return Returns next json object that contains a member attribute 
with name: memberName. Returns null if no such
+        *         object is found or the end of the stream is reached.
+        * @throws IOException
+        */
+       public String nextObjectContainingMember(String memberName) throws 
IOException {
+
+               if (endOfStream) {
+                       return null;
+               }
+
+               int i;
+               int objectCount = 0;
+               StringBuilder currentObject = new StringBuilder();
+               StringBuilder currentString = new StringBuilder();
+               MemberSearchState memberState = MemberSearchState.SEARCHING;
+
+               List<Integer> objectStack = new ArrayList<Integer>();
+
+               if (!scanToFirstBeginObject()) {
+                       return null;
+               }
+               currentObject.append(START_BRACE);
+               objectStack.add(0);
+
+               while ((i = inputStreamReader.read()) != EOF) {
+                       char c = (char) i;
+                       bytesRead++;
+
+                       lexer.lex(c);
+
+                       currentObject.append(c);
+
+                       switch (memberState) {
+                       case SEARCHING:
+                               if (lexer.getState() == 
JsonLexerState.BEGIN_STRING) {
+                                       // we found the start of a string, so 
reset our string buffer
+                                       currentString.setLength(0);
+                               } else if 
(inStringStates.contains(lexer.getState())) {
+                                       // we're still inside a string, so keep 
appending to our buffer
+                                       currentString.append(c);
+                               } else if (lexer.getState() == 
JsonLexerState.END_STRING && memberName.equals(currentString.toString())) {
+
+                                       if (objectStack.size() > 0) {
+                                               // we hit the end of the string 
and it matched the member name (yay)
+                                               memberState = 
MemberSearchState.FOUND_STRING_NAME;
+                                               currentString.setLength(0);
+                                       }
+                               } else if (lexer.getState() == 
JsonLexerState.BEGIN_OBJECT) {
+                                       // we are searching and found a '{', so 
we reset the current object string
+                                       if (objectStack.size() == 0) {
+                                               currentObject.setLength(0);
+                                               
currentObject.append(START_BRACE);
+                                       }
+                                       objectStack.add(currentObject.length() 
- 1);
+                               } else if (lexer.getState() == 
JsonLexerState.END_OBJECT) {
+                                       if (objectStack.size() > 0) {
+                                               
objectStack.remove(objectStack.size() - 1);
+                                       }
+                                       if (objectStack.size() == 0) {
+                                               currentObject.setLength(0);
+                                       }
+                               }
+                               break;
+                       case FOUND_STRING_NAME:
+                               // keep popping whitespaces until we hit a 
different token
+                               if (lexer.getState() != 
JsonLexerState.WHITESPACE) {
+                                       if (lexer.getState() == 
JsonLexerState.NAME_SEPARATOR) {
+                                               // found our member!
+                                               memberState = 
MemberSearchState.IN_MATCHING_OBJECT;
+                                               objectCount = 0;
+
+                                               if (objectStack.size() > 1) {
+                                                       currentObject.delete(0, 
objectStack.get(objectStack.size() - 1));
+                                               }
+                                               objectStack.clear();
+                                       } else {
+                                               // we didn't find a 
value-separator (:), so our string wasn't a member string. keep searching
+                                               memberState = 
MemberSearchState.SEARCHING;
+                                       }
+                               }
+                               break;
+                       case IN_MATCHING_OBJECT:
+                               if (lexer.getState() == 
JsonLexerState.BEGIN_OBJECT) {
+                                       objectCount++;
+                               } else if (lexer.getState() == 
JsonLexerState.END_OBJECT) {
+                                       objectCount--;
+                                       if (objectCount < 0) {
+                                               // we're done! we reached an 
"}" which is at the same level as the member we found
+                                               return currentObject.toString();
+                                       }
+                               }
+                               break;
+                       }
+               }
+               endOfStream = true;
+               return null;
+       }
+
+       /**
+        * @return Returns the number of bytes read from the stream.
+        */
+       public long getBytesRead() {
+               return bytesRead;
+       }
+
+       /**
+        * @return Returns true if the end of the stream has been reached and 
false otherwise.
+        */
+       public boolean isEndOfStream() {
+               return endOfStream;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java
 
b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java
new file mode 100644
index 0000000..a8161c1
--- /dev/null
+++ 
b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java
@@ -0,0 +1,272 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.plugins.hdfs.HdfsDataFragmenter;
+import org.apache.hawq.pxf.plugins.json.JsonAccessor;
+import org.apache.hawq.pxf.plugins.json.JsonResolver;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class JsonExtensionTest extends PxfUnit {
+
+       private static final String IDENTIFIER = JsonAccessor.IDENTIFIER_PARAM;
+       private List<Pair<String, DataType>> columnDefs = null;
+       private List<Pair<String, String>> extraParams = new 
ArrayList<Pair<String, String>>();
+       private List<String> output = new ArrayList<String>();
+
+       @Before
+       public void before() {
+
+               columnDefs = new ArrayList<Pair<String, DataType>>();
+
+               columnDefs.add(new Pair<String, DataType>("created_at", 
DataType.TEXT));
+               columnDefs.add(new Pair<String, DataType>("id", 
DataType.BIGINT));
+               columnDefs.add(new Pair<String, DataType>("text", 
DataType.TEXT));
+               columnDefs.add(new Pair<String, DataType>("user.screen_name", 
DataType.TEXT));
+               columnDefs.add(new Pair<String, 
DataType>("entities.hashtags[0]", DataType.TEXT));
+               columnDefs.add(new Pair<String, 
DataType>("coordinates.coordinates[0]", DataType.FLOAT8));
+               columnDefs.add(new Pair<String, 
DataType>("coordinates.coordinates[1]", DataType.FLOAT8));
+
+               output.clear();
+               extraParams.clear();
+       }
+
+       @After
+       public void cleanup() throws Exception {
+               columnDefs.clear();
+       }
+
+       @Test
+       public void testCompressedMultilineJsonFile() throws Exception {
+
+               extraParams.add(new Pair<String, String>(IDENTIFIER, 
"created_at"));
+
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547123646465,text2,patronusdeadly,,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + "src/test/resources/tweets.tar.gz"), output);
+       }
+
+       @Test
+       public void testMaxRecordLength() throws Exception {
+
+               // variable-size-objects.json contains 3 json objects but only 
2 of them fit in the 27 byte length limitation
+
+               extraParams.add(new Pair<String, String>(IDENTIFIER, "key666"));
+               extraParams.add(new Pair<String, String>("MAXLENGTH", "27"));
+
+               columnDefs.clear();
+               columnDefs.add(new Pair<String, DataType>("key666", 
DataType.TEXT));
+
+               output.add("small object1");
+               // skip the large object2 
XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
+               output.add("small object3");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + 
"src/test/resources/variable-size-objects.json"), output);
+       }
+
+       @Test
+       public void testDataTypes() throws Exception {
+
+               // TDOO: The BYTEA type is not tested. The implementation 
(val.asText().getBytes()) returns an array reference
+               // and it is not clear whether this is the desired behavior.
+               //
+               // For the time being avoid using BYTEA type!!!
+
+               // This test also verifies that the order of the columns in the 
table definition agnostic to the order of the
+               // json attributes.
+
+               extraParams.add(new Pair<String, String>(IDENTIFIER, 
"bintType"));
+
+               columnDefs.clear();
+
+               columnDefs.add(new Pair<String, DataType>("text", 
DataType.TEXT));
+               columnDefs.add(new Pair<String, DataType>("varcharType", 
DataType.VARCHAR));
+               columnDefs.add(new Pair<String, DataType>("bpcharType", 
DataType.BPCHAR));
+               columnDefs.add(new Pair<String, DataType>("smallintType", 
DataType.SMALLINT));
+               columnDefs.add(new Pair<String, DataType>("integerType", 
DataType.INTEGER));
+               columnDefs.add(new Pair<String, DataType>("realType", 
DataType.REAL));
+               columnDefs.add(new Pair<String, DataType>("float8Type", 
DataType.FLOAT8));
+               // The DataType.BYTEA type is left out for further validation.
+               columnDefs.add(new Pair<String, DataType>("charType", 
DataType.CHAR));
+               columnDefs.add(new Pair<String, DataType>("booleanType", 
DataType.BOOLEAN));
+               columnDefs.add(new Pair<String, DataType>("bintType", 
DataType.BIGINT));
+
+               
output.add(",varcharType,bpcharType,777,999,3.15,3.14,x,true,666");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + "src/test/resources/datatypes-test.json"), 
output);
+       }
+
+       @Test(expected = IllegalStateException.class)
+       public void testMissingArrayJsonAttribute() throws Exception {
+
+               extraParams.add(new Pair<String, String>(IDENTIFIER, 
"created_at"));
+
+               columnDefs.clear();
+
+               columnDefs.add(new Pair<String, DataType>("created_at", 
DataType.TEXT));
+               // User is not an array! An attempt to access it should throw 
an exception!
+               columnDefs.add(new Pair<String, DataType>("user[0]", 
DataType.TEXT));
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + 
"src/test/resources/tweets-with-missing-text-attribtute.json"), output);
+       }
+
+       @Test
+       public void testMissingJsonAttribute() throws Exception {
+
+               extraParams.add(new Pair<String, String>(IDENTIFIER, 
"created_at"));
+
+               // Missing attributes are substituted by an empty field
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547115253761,,SpreadButter,tweetCongress,,");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + 
"src/test/resources/tweets-with-missing-text-attribtute.json"), output);
+       }
+
+       @Test
+       public void testMalformedJsonObject() throws Exception {
+
+               extraParams.add(new Pair<String, String>(IDENTIFIER, 
"created_at"));
+
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+               output.add(",,,,,,"); // Expected: malformed json records are 
transformed into empty rows
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + "src/test/resources/tweets-broken.json"), 
output);
+       }
+
+       @Test
+       public void testSmallTweets() throws Exception {
+
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547123646465,text2,patronusdeadly,,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+               output.add("Fri Jun 07 22:45:03 +0000 
2013,343136551322136576,text4,SevenStonesBuoy,,-6.1,50.103");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + "src/test/resources/tweets-small.json"), 
output);
+       }
+
+       @Test
+       public void testTweetsWithNull() throws Exception {
+
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,,text1,SpreadButter,tweetCongress,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,,text2,patronusdeadly,,,");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + "src/test/resources/null-tweets.json"), 
output);
+       }
+
+       @Test
+       public void testSmallTweetsWithDelete() throws Exception {
+
+               output.add(",,,,,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547123646465,text2,patronusdeadly,,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + 
"src/test/resources/tweets-small-with-delete.json"), output);
+       }
+
+       @Test
+       public void testWellFormedJson() throws Exception {
+
+               extraParams.add(new Pair<String, String>(IDENTIFIER, 
"created_at"));
+
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547123646465,text2,patronusdeadly,,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + "src/test/resources/tweets-pp.json"), output);
+       }
+
+       @Test
+       public void testWellFormedJsonWithDelete() throws Exception {
+
+               extraParams.add(new Pair<String, String>(IDENTIFIER, 
"created_at"));
+
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547123646465,text2,patronusdeadly,,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+               super.assertOutput(new Path(System.getProperty("user.dir") + 
File.separator
+                               + 
"src/test/resources/tweets-pp-with-delete.json"), output);
+       }
+
+       @Test
+       public void testMultipleFiles() throws Exception {
+
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547123646465,text2,patronusdeadly,,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+               output.add("Fri Jun 07 22:45:03 +0000 
2013,343136551322136576,text4,SevenStonesBuoy,,-6.1,50.103");
+               output.add(",,,,,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547115253761,text1,SpreadButter,tweetCongress,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547123646465,text2,patronusdeadly,,,");
+               output.add("Fri Jun 07 22:45:02 +0000 
2013,343136547136233472,text3,NoSecrets_Vagas,,,");
+
+               super.assertUnorderedOutput(new 
Path(System.getProperty("user.dir") + File.separator
+                               + "src/test/resources/tweets-small*.json"), 
output);
+       }
+
+       @Override
+       public List<Pair<String, String>> getExtraParams() {
+               return extraParams;
+       }
+
+       @Override
+       public Class<? extends Fragmenter> getFragmenterClass() {
+               return HdfsDataFragmenter.class;
+       }
+
+       @Override
+       public Class<? extends ReadAccessor> getReadAccessorClass() {
+               return JsonAccessor.class;
+       }
+
+       @Override
+       public Class<? extends ReadResolver> getReadResolverClass() {
+               return JsonResolver.class;
+       }
+
+       @Override
+       public List<Pair<String, DataType>> getColumnDefinitions() {
+               return columnDefs;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java 
b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java
new file mode 100644
index 0000000..5669882
--- /dev/null
+++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java
@@ -0,0 +1,666 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.WriteResolver;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.service.FragmentsResponse;
+import org.apache.hawq.pxf.service.FragmentsResponseFormatter;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+
+/**
+ * This abstract class contains a number of helpful utilities in developing a 
PXF extension for HAWQ. Extend this class
+ * and use the various <code>assert</code> methods to check given input 
against known output.
+ */
+public abstract class PxfUnit {
+
+       private static final Log LOG = LogFactory.getLog(PxfUnit.class);
+
+       private static JsonFactory factory = new JsonFactory();
+       private static ObjectMapper mapper = new ObjectMapper(factory);
+
+       protected static List<InputData> inputs = null;
+
+       /**
+        * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the file for
+        * output testing.
+        * 
+        * @param input
+        *            Input records
+        * @param expectedOutput
+        *            File containing output to check
+        * @throws Exception
+        */
+       public void assertOutput(Path input, Path expectedOutput) throws 
Exception {
+
+               BufferedReader rdr = new BufferedReader(new 
InputStreamReader(FileSystem.get(new Configuration()).open(
+                               expectedOutput)));
+
+               List<String> outputLines = new ArrayList<String>();
+
+               String line;
+               while ((line = rdr.readLine()) != null) {
+                       outputLines.add(line);
+               }
+
+               assertOutput(input, outputLines);
+
+               rdr.close();
+       }
+
+       /**
+        * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the given
+        * parameter for output testing.
+        * 
+        * @param input
+        *            Input records
+        * @param expectedOutput
+        *            File containing output to check
+        * @throws Exception
+        */
+       public void assertOutput(Path input, List<String> expectedOutput) 
throws Exception {
+
+               setup(input);
+               List<String> actualOutput = new ArrayList<String>();
+               for (InputData data : inputs) {
+                       ReadAccessor accessor = getReadAccessor(data);
+                       ReadResolver resolver = getReadResolver(data);
+
+                       actualOutput.addAll(getAllOutput(accessor, resolver));
+               }
+
+               Assert.assertFalse("Output did not match expected output", 
compareOutput(expectedOutput, actualOutput));
+       }
+
+       /**
+        * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the given
+        * parameter for output testing.<br>
+        * <br>
+        * Ignores order of records.
+        * 
+        * @param input
+        *            Input records
+        * @param expectedOutput
+        *            File containing output to check
+        * @throws Exception
+        */
+       public void assertUnorderedOutput(Path input, Path expectedOutput) 
throws Exception {
+               BufferedReader rdr = new BufferedReader(new 
InputStreamReader(FileSystem.get(new Configuration()).open(
+                               expectedOutput)));
+
+               List<String> outputLines = new ArrayList<String>();
+
+               String line;
+               while ((line = rdr.readLine()) != null) {
+                       outputLines.add(line);
+               }
+
+               assertUnorderedOutput(input, outputLines);
+               rdr.close();
+       }
+
+       /**
+        * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the file for
+        * output testing.<br>
+        * <br>
+        * Ignores order of records.
+        * 
+        * @param input
+        *            Input records
+        * @param expectedOutput
+        *            File containing output to check
+        * @throws Exception
+        */
+       public void assertUnorderedOutput(Path input, List<String> 
expectedOutput) throws Exception {
+
+               setup(input);
+
+               List<String> actualOutput = new ArrayList<String>();
+               for (InputData data : inputs) {
+                       ReadAccessor accessor = getReadAccessor(data);
+                       ReadResolver resolver = getReadResolver(data);
+
+                       actualOutput.addAll(getAllOutput(accessor, resolver));
+               }
+
+               Assert.assertFalse("Output did not match expected output", 
compareUnorderedOutput(expectedOutput, actualOutput));
+       }
+
+       /**
+        * Writes the output to the given output stream. Comma delimiter.
+        * 
+        * @param input
+        *            The input file
+        * @param output
+        *            The output stream
+        * @throws Exception
+        */
+       public void writeOutput(Path input, OutputStream output) throws 
Exception {
+
+               setup(input);
+
+               for (InputData data : inputs) {
+                       ReadAccessor accessor = getReadAccessor(data);
+                       ReadResolver resolver = getReadResolver(data);
+
+                       for (String line : getAllOutput(accessor, resolver)) {
+                               output.write((line + "\n").getBytes());
+                       }
+               }
+
+               output.flush();
+       }
+
+       /**
+        * Get the class of the implementation of Fragmenter to be tested.
+        * 
+        * @return The class
+        */
+       public Class<? extends Fragmenter> getFragmenterClass() {
+               return null;
+       }
+
+       /**
+        * Get the class of the implementation of ReadAccessor to be tested.
+        * 
+        * @return The class
+        */
+       public Class<? extends ReadAccessor> getReadAccessorClass() {
+               return null;
+       }
+
+       /**
+        * Get the class of the implementation of WriteAccessor to be tested.
+        * 
+        * @return The class
+        */
+       public Class<? extends WriteAccessor> getWriteAccessorClass() {
+               return null;
+       }
+
+       /**
+        * Get the class of the implementation of Resolver to be tested.
+        * 
+        * @return The class
+        */
+       public Class<? extends ReadResolver> getReadResolverClass() {
+               return null;
+       }
+
+       /**
+        * Get the class of the implementation of WriteResolver to be tested.
+        * 
+        * @return The class
+        */
+       public Class<? extends WriteResolver> getWriteResolverClass() {
+               return null;
+       }
+
+       /**
+        * Get any extra parameters that are meant to be specified for the 
"pxf" protocol. Note that "X-GP-" is prepended to
+        * each parameter name.
+        * 
+        * @return Any extra parameters or null if none.
+        */
+       public List<Pair<String, String>> getExtraParams() {
+               return null;
+       }
+
+       /**
+        * Gets the column definition names and data types. Types are DataType 
objects
+        * 
+        * @return A list of column definition name value pairs. Cannot be null.
+        */
+       public abstract List<Pair<String, DataType>> getColumnDefinitions();
+
+       protected InputData getInputDataForWritableTable() {
+               return getInputDataForWritableTable(null);
+       }
+
+       protected InputData getInputDataForWritableTable(Path input) {
+
+               if (getWriteAccessorClass() == null) {
+                       throw new IllegalArgumentException(
+                                       "getWriteAccessorClass() must be 
overwritten to return a non-null object");
+               }
+
+               if (getWriteResolverClass() == null) {
+                       throw new IllegalArgumentException(
+                                       "getWriteResolverClass() must be 
overwritten to return a non-null object");
+               }
+
+               Map<String, String> paramsMap = new HashMap<String, String>();
+
+               paramsMap.put("X-GP-ALIGNMENT", "what");
+               paramsMap.put("X-GP-SEGMENT-ID", "1");
+               paramsMap.put("X-GP-HAS-FILTER", "0");
+               paramsMap.put("X-GP-SEGMENT-COUNT", "1");
+
+               paramsMap.put("X-GP-FORMAT", "GPDBWritable");
+               paramsMap.put("X-GP-URL-HOST", "localhost");
+               paramsMap.put("X-GP-URL-PORT", "50070");
+
+               if (input == null) {
+                       paramsMap.put("X-GP-DATA-DIR", "/dummydata");
+               }
+
+               List<Pair<String, DataType>> params = getColumnDefinitions();
+               paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
+               for (int i = 0; i < params.size(); ++i) {
+                       paramsMap.put("X-GP-ATTR-NAME" + i, 
params.get(i).first);
+                       paramsMap.put("X-GP-ATTR-TYPENAME" + i, 
params.get(i).second.name());
+                       paramsMap.put("X-GP-ATTR-TYPECODE" + i, 
Integer.toString(params.get(i).second.getOID()));
+               }
+
+               paramsMap.put("X-GP-ACCESSOR", 
getWriteAccessorClass().getName());
+               paramsMap.put("X-GP-RESOLVER", 
getWriteResolverClass().getName());
+
+               if (getExtraParams() != null) {
+                       for (Pair<String, String> param : getExtraParams()) {
+                               paramsMap.put("X-GP-" + param.first, 
param.second);
+                       }
+               }
+
+               return new ProtocolData(paramsMap);
+       }
+
+       /**
+        * Set all necessary parameters for GPXF framework to function. Uses 
the given path as a single input split.
+        * 
+        * @param input
+        *            The input path, relative or absolute.
+        * @throws Exception
+        */
+       protected void setup(Path input) throws Exception {
+
+               if (getFragmenterClass() == null) {
+                       throw new 
IllegalArgumentException("getFragmenterClass() must be overwritten to return a 
non-null object");
+               }
+
+               if (getReadAccessorClass() == null) {
+                       throw new 
IllegalArgumentException("getReadAccessorClass() must be overwritten to return 
a non-null object");
+               }
+
+               if (getReadResolverClass() == null) {
+                       throw new 
IllegalArgumentException("getReadResolverClass() must be overwritten to return 
a non-null object");
+               }
+
+               Map<String, String> paramsMap = new HashMap<String, String>();
+
+               // 2.1.0 Properties
+               // HDMetaData parameters
+               paramsMap.put("X-GP-ALIGNMENT", "what");
+               paramsMap.put("X-GP-SEGMENT-ID", "1");
+               paramsMap.put("X-GP-HAS-FILTER", "0");
+               paramsMap.put("X-GP-SEGMENT-COUNT", "1");
+               paramsMap.put("X-GP-FRAGMENTER", 
getFragmenterClass().getName());
+               paramsMap.put("X-GP-FORMAT", "GPDBWritable");
+               paramsMap.put("X-GP-URL-HOST", "localhost");
+               paramsMap.put("X-GP-URL-PORT", "50070");
+
+               paramsMap.put("X-GP-DATA-DIR", input.toString());
+
+               List<Pair<String, DataType>> params = getColumnDefinitions();
+               paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
+               for (int i = 0; i < params.size(); ++i) {
+                       paramsMap.put("X-GP-ATTR-NAME" + i, 
params.get(i).first);
+                       paramsMap.put("X-GP-ATTR-TYPENAME" + i, 
params.get(i).second.name());
+                       paramsMap.put("X-GP-ATTR-TYPECODE" + i, 
Integer.toString(params.get(i).second.getOID()));
+               }
+
+               // HDFSMetaData properties
+               paramsMap.put("X-GP-ACCESSOR", 
getReadAccessorClass().getName());
+               paramsMap.put("X-GP-RESOLVER", 
getReadResolverClass().getName());
+
+               if (getExtraParams() != null) {
+                       for (Pair<String, String> param : getExtraParams()) {
+                               paramsMap.put("X-GP-" + param.first, 
param.second);
+                       }
+               }
+
+               LocalInputData fragmentInputData = new 
LocalInputData(paramsMap);
+
+               List<Fragment> fragments = 
getFragmenter(fragmentInputData).getFragments();
+
+               FragmentsResponse fragmentsResponse = 
FragmentsResponseFormatter.formatResponse(fragments, input.toString());
+
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               fragmentsResponse.write(baos);
+
+               String jsonOutput = baos.toString();
+
+               inputs = new ArrayList<InputData>();
+
+               JsonNode node = decodeLineToJsonNode(jsonOutput);
+
+               JsonNode fragmentsArray = node.get("PXFFragments");
+               int i = 0;
+               Iterator<JsonNode> iter = fragmentsArray.getElements();
+               while (iter.hasNext()) {
+                       JsonNode fragNode = iter.next();
+                       String sourceData = 
fragNode.get("sourceName").getTextValue();
+                       if (!sourceData.startsWith("/")) {
+                               sourceData = "/" + sourceData;
+                       }
+                       paramsMap.put("X-GP-DATA-DIR", sourceData);
+                       paramsMap.put("X-GP-FRAGMENT-METADATA", 
fragNode.get("metadata").getTextValue());
+                       paramsMap.put("X-GP-DATA-FRAGMENT", 
Integer.toString(i++));
+                       inputs.add(new LocalInputData(paramsMap));
+               }
+       }
+
+       private JsonNode decodeLineToJsonNode(String line) {
+               try {
+                       return mapper.readTree(line);
+               } catch (Exception e) {
+                       LOG.warn(e);
+                       return null;
+               }
+       }
+
+       /**
+        * Compares the expected and actual output, printing out any errors.
+        * 
+        * @param expectedOutput
+        *            The expected output
+        * @param actualOutput
+        *            The actual output
+        * @return True if no errors, false otherwise.
+        */
+       protected boolean compareOutput(List<String> expectedOutput, 
List<String> actualOutput) {
+
+               boolean error = false;
+               for (int i = 0; i < expectedOutput.size(); ++i) {
+                       boolean match = false;
+                       for (int j = 0; j < actualOutput.size(); ++j) {
+                               if 
(expectedOutput.get(i).equals(actualOutput.get(j))) {
+                                       match = true;
+                                       if (i != j) {
+                                               LOG.error("Expected (" + 
expectedOutput.get(i) + ") matched (" + actualOutput.get(j)
+                                                               + ") but in 
wrong place.  " + j + " instead of " + i);
+                                               error = true;
+                                       }
+
+                                       break;
+                               }
+                       }
+
+                       if (!match) {
+                               LOG.error("Missing expected output: (" + 
expectedOutput.get(i) + ")");
+                               error = true;
+                       }
+               }
+
+               for (int i = 0; i < actualOutput.size(); ++i) {
+                       boolean match = false;
+                       for (int j = 0; j < expectedOutput.size(); ++j) {
+                               if 
(actualOutput.get(i).equals(expectedOutput.get(j))) {
+                                       match = true;
+                                       break;
+                               }
+                       }
+
+                       if (!match) {
+                               LOG.error("Received unexpected output: (" + 
actualOutput.get(i) + ")");
+                               error = true;
+                       }
+               }
+
+               return error;
+       }
+
+       /**
+        * Compares the expected and actual output, printing out any errors.
+        * 
+        * @param expectedOutput
+        *            The expected output
+        * @param actualOutput
+        *            The actual output
+        * @return True if no errors, false otherwise.
+        */
+       protected boolean compareUnorderedOutput(List<String> expectedOutput, 
List<String> actualOutput) {
+
+               boolean error = false;
+               for (int i = 0; i < expectedOutput.size(); ++i) {
+                       boolean match = false;
+                       for (int j = 0; j < actualOutput.size(); ++j) {
+                               if 
(expectedOutput.get(i).equals(actualOutput.get(j))) {
+                                       match = true;
+                                       break;
+                               }
+                       }
+
+                       if (!match) {
+                               LOG.error("Missing expected output: (" + 
expectedOutput.get(i) + ")");
+                               error = true;
+                       }
+               }
+
+               for (int i = 0; i < actualOutput.size(); ++i) {
+                       boolean match = false;
+                       for (int j = 0; j < expectedOutput.size(); ++j) {
+                               if 
(actualOutput.get(i).equals(expectedOutput.get(j))) {
+                                       match = true;
+                                       break;
+                               }
+                       }
+
+                       if (!match) {
+                               LOG.error("Received unexpected output: (" + 
actualOutput.get(i) + ")");
+                               error = true;
+                       }
+               }
+
+               return error;
+       }
+
+       /**
+        * Opens the accessor and reads all output, giving it to the resolver 
to retrieve the list of fields. These fields
+        * are then added to a string, delimited by commas, and returned in a 
list.
+        * 
+        * @param accessor
+        *            The accessor instance to use
+        * @param resolver
+        *            The resolver instance to use
+        * @return The list of output strings
+        * @throws Exception
+        */
+       protected List<String> getAllOutput(ReadAccessor accessor, ReadResolver 
resolver) throws Exception {
+
+               Assert.assertTrue("Accessor failed to open", 
accessor.openForRead());
+
+               List<String> output = new ArrayList<String>();
+
+               OneRow row = null;
+               while ((row = accessor.readNextObject()) != null) {
+
+                       StringBuilder bldr = new StringBuilder();
+                       for (OneField field : resolver.getFields(row)) {
+                               bldr.append((field != null && field.val != null 
? field.val : "") + ",");
+                       }
+
+                       if (bldr.length() > 0) {
+                               bldr.deleteCharAt(bldr.length() - 1);
+                       }
+
+                       output.add(bldr.toString());
+               }
+
+               accessor.closeForRead();
+
+               return output;
+       }
+
+       /**
+        * Gets an instance of Fragmenter via reflection.
+        * 
+        * Searches for a constructor that has a single parameter of some 
BaseMetaData type
+        * 
+        * @return A Fragmenter instance
+        * @throws Exception
+        *             If something bad happens
+        */
+       protected Fragmenter getFragmenter(InputData meta) throws Exception {
+
+               Fragmenter fragmenter = null;
+
+               for (Constructor<?> c : getFragmenterClass().getConstructors()) 
{
+                       if (c.getParameterTypes().length == 1) {
+                               for (Class<?> clazz : c.getParameterTypes()) {
+                                       if 
(InputData.class.isAssignableFrom(clazz)) {
+                                               fragmenter = (Fragmenter) 
c.newInstance(meta);
+                                       }
+                               }
+                       }
+               }
+
+               if (fragmenter == null) {
+                       throw new InvalidParameterException("Unable to find 
Fragmenter constructor with a BaseMetaData parameter");
+               }
+
+               return fragmenter;
+
+       }
+
+       /**
+        * Gets an instance of ReadAccessor via reflection.
+        * 
+        * Searches for a constructor that has a single parameter of some 
InputData type
+        * 
+        * @return An ReadAccessor instance
+        * @throws Exception
+        *             If something bad happens
+        */
+       protected ReadAccessor getReadAccessor(InputData data) throws Exception 
{
+
+               ReadAccessor accessor = null;
+
+               for (Constructor<?> c : 
getReadAccessorClass().getConstructors()) {
+                       if (c.getParameterTypes().length == 1) {
+                               for (Class<?> clazz : c.getParameterTypes()) {
+                                       if 
(InputData.class.isAssignableFrom(clazz)) {
+                                               accessor = (ReadAccessor) 
c.newInstance(data);
+                                       }
+                               }
+                       }
+               }
+
+               if (accessor == null) {
+                       throw new InvalidParameterException("Unable to find 
Accessor constructor with a BaseMetaData parameter");
+               }
+
+               return accessor;
+
+       }
+
+       /**
+        * Gets an instance of IFieldsResolver via reflection.
+        * 
+        * Searches for a constructor that has a single parameter of some 
BaseMetaData type
+        * 
+        * @return A IFieldsResolver instance
+        * @throws Exception
+        *             If something bad happens
+        */
+       protected ReadResolver getReadResolver(InputData data) throws Exception 
{
+
+               ReadResolver resolver = null;
+
+               // search for a constructor that has a single parameter of a 
type of
+               // BaseMetaData to create the accessor instance
+               for (Constructor<?> c : 
getReadResolverClass().getConstructors()) {
+                       if (c.getParameterTypes().length == 1) {
+                               for (Class<?> clazz : c.getParameterTypes()) {
+                                       if 
(InputData.class.isAssignableFrom(clazz)) {
+                                               resolver = (ReadResolver) 
c.newInstance(data);
+                                       }
+                               }
+                       }
+               }
+
+               if (resolver == null) {
+                       throw new InvalidParameterException("Unable to find 
Resolver constructor with a BaseMetaData parameter");
+               }
+
+               return resolver;
+       }
+
+       public static class Pair<FIRST, SECOND> {
+
+               public FIRST first;
+               public SECOND second;
+
+               public Pair() {
+               }
+
+               public Pair(FIRST f, SECOND s) {
+                       this.first = f;
+                       this.second = s;
+               }
+       }
+
+       /**
+        * An extension of InputData for the local file system instead of HDFS. 
Leveraged by the PXFUnit framework. Do not
+        * concern yourself with such a simple piece of code.
+        */
+       public static class LocalInputData extends ProtocolData {
+
+               public LocalInputData(ProtocolData copy) {
+                       super(copy);
+                       super.setDataSource(super.getDataSource().substring(1));
+               }
+
+               public LocalInputData(Map<String, String> paramsMap) {
+                       super(paramsMap);
+               }
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java
 
b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java
new file mode 100644
index 0000000..aa8ea14
--- /dev/null
+++ 
b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java
@@ -0,0 +1,141 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+public class JsonLexerTest {
+
+       private static final Log LOG = LogFactory.getLog(JsonLexerTest.class);
+
+       @Test
+       public void testSimple() throws IOException {
+               File testsDir = new File("src/test/resources/lexer-tests");
+               File[] jsonFiles = testsDir.listFiles(new FilenameFilter() {
+                       public boolean accept(File file, String s) {
+                               return s.endsWith(".json");
+                       }
+               });
+
+               for (File jsonFile : jsonFiles) {
+                       File stateFile = new File(jsonFile.getAbsolutePath() + 
".state");
+                       if (stateFile.exists()) {
+                               runTest(jsonFile, stateFile);
+                       }
+               }
+       }
+
+       public static Pattern STATE_RECURRENCE = 
Pattern.compile("^([A-Za-z\\_0-9]+)\\{([0-9]+)\\}$");
+
+       public void runTest(File jsonFile, File stateFile) throws IOException {
+               List<String> lexerStates = FileUtils.readLines(stateFile);
+               InputStream jsonInputStream = new FileInputStream(jsonFile);
+
+               try {
+                       JsonLexer lexer = new JsonLexer();
+
+                       int byteOffset = 0;
+                       int i;
+                       ListIterator<String> stateIterator = 
lexerStates.listIterator();
+                       int recurrence = 0;
+                       JsonLexer.JsonLexerState expectedState = null;
+                       StringBuilder sb = new StringBuilder();
+                       int stateFileLineNum = 0;
+                       while ((i = jsonInputStream.read()) != -1) {
+                               byteOffset++;
+                               char c = (char) i;
+
+                               sb.append(c);
+
+                               lexer.lex(c);
+
+                               if (lexer.getState() == 
JsonLexer.JsonLexerState.WHITESPACE) {
+                                       // optimization to skip over multiple 
whitespaces
+                                       continue;
+                               }
+
+                               if (!stateIterator.hasNext()) {
+                                       assertFalse(formatStateInfo(jsonFile, 
sb.toString(), byteOffset, stateFileLineNum)
+                                                       + ": Input stream had 
character '" + c + "' but no matching state", true);
+                               }
+
+                               if (recurrence <= 0) {
+                                       String state = 
stateIterator.next().trim();
+                                       stateFileLineNum++;
+
+                                       while (state.equals("") || 
state.startsWith("#")) {
+                                               if (!stateIterator.hasNext()) {
+                                                       
assertFalse(formatStateInfo(jsonFile, sb.toString(), byteOffset, 
stateFileLineNum)
+                                                                       + ": 
Input stream had character '" + c + "' but no matching state", true);
+                                               }
+                                               state = 
stateIterator.next().trim();
+                                               stateFileLineNum++;
+                                       }
+
+                                       Matcher m = 
STATE_RECURRENCE.matcher(state);
+                                       recurrence = 1;
+                                       if (m.matches()) {
+                                               state = m.group(1);
+                                               recurrence = 
Integer.valueOf(m.group(2));
+                                       }
+                                       expectedState = 
JsonLexer.JsonLexerState.valueOf(state);
+                               }
+
+                               assertEquals(formatStateInfo(jsonFile, 
sb.toString(), byteOffset, stateFileLineNum)
+                                               + ": Issue for char '" + c + 
"'", expectedState, lexer.getState());
+                               recurrence--;
+                       }
+
+                       if (stateIterator.hasNext()) {
+                               assertFalse(formatStateInfo(jsonFile, 
sb.toString(), byteOffset, stateFileLineNum)
+                                               + ": Input stream has ended but 
more states were expected: '" + stateIterator.next() + "...'",
+                                               true);
+                       }
+
+               } finally {
+                       IOUtils.closeQuietly(jsonInputStream);
+               }
+
+               LOG.info("File " + jsonFile.getName() + " passed");
+
+       }
+
+       static String formatStateInfo(File jsonFile, String streamContents, int 
streamByteOffset, int stateFileLineNum) {
+               return jsonFile.getName() + ": Input stream currently at 
byte-offset " + streamByteOffset + ", contents = '"
+                               + streamContents + "'" + " state-file line = " 
+ stateFileLineNum;
+       }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java
 
b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java
new file mode 100644
index 0000000..cdc876b
--- /dev/null
+++ 
b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java
@@ -0,0 +1,83 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+
+public class PartitionedJsonParserNoSeekTest {
+
+       private static final Log LOG = 
LogFactory.getLog(PartitionedJsonParserNoSeekTest.class);
+
+       @Test
+       public void testNoSeek() throws IOException {
+               File testsDir = new 
File("src/test/resources/parser-tests/noseek");
+               File[] jsonFiles = testsDir.listFiles(new FilenameFilter() {
+                       public boolean accept(File file, String s) {
+                               return s.endsWith(".json");
+                       }
+               });
+
+               for (File jsonFile : jsonFiles) {
+                       runTest(jsonFile);
+               }
+       }
+
+       public void runTest(final File jsonFile) throws IOException {
+               InputStream jsonInputStream = new FileInputStream(jsonFile);
+
+               try {
+                       PartitionedJsonParser parser = new 
PartitionedJsonParser(jsonInputStream);
+
+                       File[] jsonOjbectFiles = 
jsonFile.getParentFile().listFiles(new FilenameFilter() {
+                               public boolean accept(File file, String s) {
+                                       return s.contains(jsonFile.getName()) 
&& s.contains("expected");
+                               }
+                       });
+
+                       for (File jsonObjectFile : jsonOjbectFiles) {
+                               String expected = 
trimWhitespaces(FileUtils.readFileToString(jsonObjectFile));
+                               String result = 
parser.nextObjectContainingMember("name");
+                               assertNotNull(jsonFile.getName() + "/" + 
jsonObjectFile.getName(), result);
+                               assertEquals(jsonFile.getName() + "/" + 
jsonObjectFile.getName(), expected, trimWhitespaces(result));
+                               LOG.info("File " + jsonFile.getName() + "/" + 
jsonObjectFile.getName() + " passed");
+                       }
+
+               } finally {
+                       IOUtils.closeQuietly(jsonInputStream);
+               }
+       }
+
+       public String trimWhitespaces(String s) {
+               return s.replaceAll("[\\n\\t\\r \\t]+", " ").trim();
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java
----------------------------------------------------------------------
diff --git 
a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java
 
b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java
new file mode 100644
index 0000000..3a1b3b6
--- /dev/null
+++ 
b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java
@@ -0,0 +1,55 @@
+package org.apache.hawq.pxf.plugins.json.parser;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.junit.Test;
+
+public class PartitionedJsonParserOffsetTest {
+       /*
+        * [{"color": "red","v": "vv"},{"color": "red","v": "vv"}]
+        */
+       public static String json2 = "[{\"color\": \"red\",\"v\": 
\"vv\"},{\"color\": \"red\",\"v\": \"vv\"}]";
+
+       @Test
+       public void testOffset() throws IOException {
+               InputStream jsonInputStream = createFromString(json2);
+               PartitionedJsonParser parser = new 
PartitionedJsonParser(jsonInputStream);
+               String result = parser.nextObjectContainingMember("color");
+               assertNotNull(result);
+               assertEquals(27, parser.getBytesRead());
+               assertEquals(1, parser.getBytesRead() - result.length());
+               result = parser.nextObjectContainingMember("color");
+               assertNotNull(result);
+               assertEquals(54, parser.getBytesRead());
+               assertEquals(28, parser.getBytesRead() - result.length());
+               jsonInputStream.close();
+       }
+
+       public InputStream createFromString(String s) {
+               return new ByteArrayInputStream(s.getBytes());
+       }
+}

Reply via email to