HAWQ-178. Add JSON plugin support in code base.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/fd9c3686 Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/fd9c3686 Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/fd9c3686 Branch: refs/heads/master Commit: fd9c36861506ac94255a34c6a85307bf87ae0f72 Parents: c0d7c4f Author: Oleksandr Diachenko <[email protected]> Authored: Wed May 18 16:58:07 2016 -0700 Committer: Oleksandr Diachenko <[email protected]> Committed: Wed May 18 16:58:07 2016 -0700 ---------------------------------------------------------------------- pxf/build.gradle | 34 +- pxf/pxf-json/.gitignore | 1 + .../pxf/plugins/json/ColumnDescriptorCache.java | 119 ++++ .../hawq/pxf/plugins/json/JsonAccessor.java | 84 +++ .../hawq/pxf/plugins/json/JsonRecordReader.java | 176 +++++ .../hawq/pxf/plugins/json/JsonResolver.java | 256 +++++++ .../hawq/pxf/plugins/json/parser/JsonLexer.java | 175 +++++ .../json/parser/PartitionedJsonParser.java | 200 ++++++ .../pxf/plugins/json/JsonExtensionTest.java | 272 ++++++++ .../apache/hawq/pxf/plugins/json/PxfUnit.java | 666 +++++++++++++++++++ .../pxf/plugins/json/parser/JsonLexerTest.java | 141 ++++ .../parser/PartitionedJsonParserNoSeekTest.java | 83 +++ .../parser/PartitionedJsonParserOffsetTest.java | 55 ++ .../parser/PartitionedJsonParserSeekTest.java | 113 ++++ .../src/test/resources/datatypes-test.json | 14 + .../lexer-tests/array_objects_complex.json | 24 + .../array_objects_complex.json.state | 149 +++++ .../lexer-tests/array_objects_empty.json | 1 + .../lexer-tests/array_objects_empty.json.state | 7 + .../resources/lexer-tests/array_of_numbers.json | 1 + .../lexer-tests/array_of_numbers.json.state | 5 + .../resources/lexer-tests/object_complex.json | 22 + .../lexer-tests/object_complex.json.state | 146 ++++ .../resources/lexer-tests/object_simple.json | 4 + .../lexer-tests/object_simple.json.state | 24 + .../lexer-tests/object_string_escaping.json | 4 + .../object_string_escaping.json.state | 44 ++ .../src/test/resources/log4j.properties | 22 + .../src/test/resources/null-tweets.json | 2 + .../noseek/array_objects_complex.json | 24 + .../array_objects_complex.json.expected1.json | 22 + .../noseek/array_objects_complex2.json | 24 + .../array_objects_complex2.json.expected1.json | 4 + .../array_objects_complex2.json.expected2.json | 4 + .../array_objects_same_name_in_child.json | 24 + ...jects_same_name_in_child.json.expected1.json | 22 + .../parser-tests/noseek/object_simple.json | 4 + .../noseek/object_simple.json.expected1.json | 4 + .../noseek/object_string_escaping.json | 4 + .../object_string_escaping.json.expected1.json | 4 + .../child-object-before-member/expected.1.json | 44 ++ .../seek/child-object-before-member/input.json | 59 ++ .../child-object-before-member2/expected.1.json | 44 ++ .../seek/child-object-before-member2/input.json | 59 ++ .../parser-tests/seek/multi/expected.1.json | 22 + .../parser-tests/seek/multi/expected.2.json | 25 + .../parser-tests/seek/multi/input.json | 48 ++ .../parser-tests/seek/no-elements/input.json | 24 + .../seek/seek-into-mid-object-1/expected.1.json | 22 + .../seek/seek-into-mid-object-1/input.json | 28 + .../seek/seek-into-mid-object-2/expected.1.json | 22 + .../seek/seek-into-mid-object-2/input.json | 28 + .../seek/seek-into-mid-object-3/expected.1.json | 22 + .../seek/seek-into-mid-object-3/input.json | 28 + .../seek/seek-into-string-1/expected.1.json | 22 + .../seek/seek-into-string-1/input.json | 27 + .../seek/seek-into-string-2/expected.1.json | 22 + .../seek/seek-into-string-2/input.json | 27 + .../parser-tests/seek/simple/expected.1.json | 22 + .../parser-tests/seek/simple/input.json | 24 + .../src/test/resources/sample-malformed.json | 3 + pxf/pxf-json/src/test/resources/sample.json | 3 + .../src/test/resources/tweets-broken.json | 67 ++ .../test/resources/tweets-pp-with-delete.json | 79 +++ pxf/pxf-json/src/test/resources/tweets-pp.json | 67 ++ .../resources/tweets-small-with-delete.json | 4 + .../src/test/resources/tweets-small.json | 4 + .../tweets-with-missing-text-attribtute.json | 18 + pxf/pxf-json/src/test/resources/tweets.tar.gz | Bin 0 -> 796 bytes .../test/resources/variable-size-objects.json | 3 + .../src/main/resources/pxf-privatehdp.classpath | 1 + .../src/main/resources/pxf-privatephd.classpath | 1 + .../src/main/resources/pxf-profiles-default.xml | 14 + pxf/settings.gradle | 1 + 74 files changed, 3861 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/build.gradle ---------------------------------------------------------------------- diff --git a/pxf/build.gradle b/pxf/build.gradle index 3f3d31c..eb716fc 100644 --- a/pxf/build.gradle +++ b/pxf/build.gradle @@ -33,7 +33,7 @@ buildscript { url 'http://repository.jboss.org/nexus/content/groups/public' } } - + dependencies { classpath "com.netflix.nebula:gradle-ospackage-plugin:2.2.6" classpath "de.undercouch:gradle-download-task:2.1.0" @@ -87,7 +87,7 @@ subprojects { subProject -> testCompile 'org.powermock:powermock-api-mockito:1.5.1' testCompile 'org.mockito:mockito-core:1.9.5' } - + configurations.all { resolutionStrategy { // force versions that were specified in dependencies: @@ -337,6 +337,26 @@ project('pxf-hive') { } } +project('pxf-json') { + dependencies { + compile(project(':pxf-hdfs')) + compile(project(':pxf-service')) + compile "org.apache.commons:commons-lang3:3.0" + + testCompile 'pl.pragmatists:JUnitParams:1.0.2' + } + + ospackage { + requires('pxf-hdfs', project.version, GREATER | EQUAL) + + from(jar.outputs.files) { + into "/usr/lib/pxf-${project.version}" + } + + link("/usr/lib/pxf-${project.version}/${project.name}.jar", "${project.name}-${project.version}.jar") + } +} + project('pxf-hbase') { dependencies { compile(project(':pxf-api')) @@ -404,15 +424,15 @@ task rpm(type: Copy, dependsOn: [subprojects.build, distSubprojects.buildRpm]) { def tomcatName = "apache-tomcat-${tomcatVersion}" def tomcatTargetDir = "tomcat/build/" - + task tomcatGet << { - + apply plugin: 'de.undercouch.download' def TarGzSuffix = ".tar.gz" def tomcatTar = "${tomcatName}${TarGzSuffix}" def tomcatUrl = "http://archive.apache.org/dist/tomcat/tomcat-7/v${tomcatVersion}/bin/${tomcatTar}" - + if (file("${tomcatTargetDir}/${tomcatName}").exists()) { println "${tomcatName} already exists, nothing to do" return 0 @@ -436,13 +456,13 @@ apply plugin: 'os-package' task tomcatRpm(type: Rpm) { buildDir = 'tomcat/build/' - + // clean should not delete the downloaded tarball // and RPM, so this is a bogus directory to delete instead. clean { delete = 'tomcat/build/something' } - + ospackage { packageName 'apache-tomcat' summary = 'Apache Tomcat RPM' http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/.gitignore ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/.gitignore b/pxf/pxf-json/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/pxf/pxf-json/.gitignore @@ -0,0 +1 @@ +/bin/ http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java new file mode 100644 index 0000000..01cd37c --- /dev/null +++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/ColumnDescriptorCache.java @@ -0,0 +1,119 @@ +package org.apache.hawq.pxf.plugins.json; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; + +/** + * Helper class used to retrieve all column details relevant for the json processing. + */ +public class ColumnDescriptorCache { + + private static Pattern ARRAY_PROJECTION_PATTERN = Pattern.compile("(.+)\\[([0-9]+)\\]"); + private static int ARRAY_NAME_GROUPID = 1; + private static int ARRAY_INDEX_GROUPID = 2; + + private final DataType columnType; + private final String[] normalizedProjection; + private final String arrayNodeName; + private final int arrayNodeIndex; + private final boolean isArray; + private String columnName; + + public ColumnDescriptorCache(ColumnDescriptor columnDescriptor) { + + // HAWQ column type + this.columnType = DataType.get(columnDescriptor.columnTypeCode()); + + this.columnName = columnDescriptor.columnName(); + + // Column name can use dot-name convention to specify a nested json node. + // Break the path into array of path steps called projections + String[] projection = columnDescriptor.columnName().split("\\."); + + // When the projection contains array reference (e.g. projections = foo.bar[66]) then replace the last path + // element by the array name (e.g. normalizedProjection = foo.bar) + normalizedProjection = new String[projection.length]; + + // Check if the provided json path (projections) refers to an array element. + Matcher matcher = ARRAY_PROJECTION_PATTERN.matcher(projection[projection.length - 1]); + if (matcher.matches()) { + this.isArray = true; + // extracts the array node name from the projection path + this.arrayNodeName = matcher.group(ARRAY_NAME_GROUPID); + // extracts the array index from the projection path + this.arrayNodeIndex = Integer.parseInt(matcher.group(ARRAY_INDEX_GROUPID)); + + System.arraycopy(projection, 0, normalizedProjection, 0, projection.length - 1); + normalizedProjection[projection.length - 1] = this.arrayNodeName; + } else { + this.isArray = false; + this.arrayNodeName = null; + this.arrayNodeIndex = -1; + + System.arraycopy(projection, 0, normalizedProjection, 0, projection.length); + } + } + + /** + * @return Column's type + */ + public DataType getColumnType() { + return columnType; + } + + /** + * @return Returns the column name as defined in the HAWQ table. + */ + public String getColumnName() { + return columnName; + } + + /** + * If the column name contains dots (.) then this name is interpreted as path into the target json document pointing + * to nested json member. The leftmost path element stands for the root in the json document. + * + * @return If the column name contains dots (.) list of field names that represent the path from the root json node + * to the target nested node. + */ + public String[] getNormalizedProjections() { + return normalizedProjection; + } + + /** + * The 'jsonName[index]' column name conventions is used to point to a particular json array element. + * + * @return Returns the json index of the referred array element. + */ + public int getArrayNodeIndex() { + return arrayNodeIndex; + } + + /** + * @return Returns true if the column name is a path to json array element and false otherwise. + */ + public boolean isArray() { + return isArray; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java new file mode 100644 index 0000000..8006273 --- /dev/null +++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonAccessor.java @@ -0,0 +1,84 @@ +package org.apache.hawq.pxf.plugins.json; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.apache.commons.lang3.StringUtils.isEmpty; + +import java.io.IOException; + +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.LineRecordReader; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.plugins.hdfs.HdfsSplittableDataAccessor; + +/** + * This JSON accessor for PXF will read JSON data and pass it to a {@link JsonResolver}. + * + * This accessor supports a single JSON record per line, or a multi-line JSON records if the <b>IDENTIFIER</b> parameter + * is set. + * + * When provided the <b>IDENTIFIER</b> indicates the member name used to determine the encapsulating json object to + * return. + */ +public class JsonAccessor extends HdfsSplittableDataAccessor { + + public static final String IDENTIFIER_PARAM = "IDENTIFIER"; + public static final String RECORD_MAX_LENGTH_PARAM = "MAXLENGTH"; + + /** + * If provided indicates the member name which will be used to determine the encapsulating json object to return. + */ + private String identifier = ""; + + /** + * Optional parameter that allows to define the max length of a json record. Records that exceed the allowed length + * are skipped. This parameter is applied only for the multi-line json records (e.g. when the IDENTIFIER is + * provided). + */ + private int maxRecordLength = Integer.MAX_VALUE; + + public JsonAccessor(InputData inputData) throws Exception { + // Because HdfsSplittableDataAccessor doesn't use the InputFormat we set it to null. + super(inputData, null); + + if (!isEmpty(inputData.getUserProperty(IDENTIFIER_PARAM))) { + + identifier = inputData.getUserProperty(IDENTIFIER_PARAM); + + // If the member identifier is set then check if a record max length is defined as well. + if (!isEmpty(inputData.getUserProperty(RECORD_MAX_LENGTH_PARAM))) { + maxRecordLength = Integer.valueOf(inputData.getUserProperty(RECORD_MAX_LENGTH_PARAM)); + } + } + } + + @Override + protected Object getReader(JobConf conf, InputSplit split) throws IOException { + if (!isEmpty(identifier)) { + conf.set(JsonRecordReader.RECORD_MEMBER_IDENTIFIER, identifier); + conf.setInt(JsonRecordReader.RECORD_MAX_LENGTH, maxRecordLength); + return new JsonRecordReader(conf, (FileSplit) split); + } else { + return new LineRecordReader(conf, (FileSplit) split); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java new file mode 100644 index 0000000..26d4c82 --- /dev/null +++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java @@ -0,0 +1,176 @@ +package org.apache.hawq.pxf.plugins.json; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hawq.pxf.plugins.json.parser.PartitionedJsonParser; + +/** + * Multi-line json object reader. JsonRecordReader uses a member name (set by the <b>IDENTIFIER</b> PXF parameter) to + * determine the encapsulating object to extract and read. + * + * JsonRecordReader supports compressed input files as well. + * + * As a safe guard set the optional <b>MAXLENGTH</b> parameter to limit the max size of a record. + */ +public class JsonRecordReader implements RecordReader<LongWritable, Text> { + + private static final Log LOG = LogFactory.getLog(JsonRecordReader.class); + + public static final String RECORD_MEMBER_IDENTIFIER = "json.input.format.record.identifier"; + public static final String RECORD_MAX_LENGTH = "multilinejsonrecordreader.maxlength"; + + private CompressionCodecFactory compressionCodecs = null; + private long start; + private long pos; + private long end; + private int maxObjectLength; + private InputStream is; + private PartitionedJsonParser parser; + private final String jsonMemberName; + + /** + * Create new multi-line json object reader. + * + * @param conf + * Hadoop context + * @param split + * HDFS split to start the reading from + * @throws IOException + */ + public JsonRecordReader(JobConf conf, FileSplit split) throws IOException { + + this.jsonMemberName = conf.get(RECORD_MEMBER_IDENTIFIER); + this.maxObjectLength = conf.getInt(RECORD_MAX_LENGTH, Integer.MAX_VALUE); + + start = split.getStart(); + end = start + split.getLength(); + final Path file = split.getPath(); + compressionCodecs = new CompressionCodecFactory(conf); + final CompressionCodec codec = compressionCodecs.getCodec(file); + + // open the file and seek to the start of the split + FileSystem fs = file.getFileSystem(conf); + FSDataInputStream fileIn = fs.open(split.getPath()); + if (codec != null) { + is = codec.createInputStream(fileIn); + start = 0; + end = Long.MAX_VALUE; + } else { + if (start != 0) { + fileIn.seek(start); + } + is = fileIn; + } + parser = new PartitionedJsonParser(is); + this.pos = start; + } + + /* + * {@inheritDoc} + */ + @Override + public boolean next(LongWritable key, Text value) throws IOException { + + while (pos < end) { + + String json = parser.nextObjectContainingMember(jsonMemberName); + pos = start + parser.getBytesRead(); + + if (json == null) { + return false; + } + + long jsonStart = pos - json.length(); + + // if the "begin-object" position is after the end of our split, we should ignore it + if (jsonStart >= end) { + return false; + } + + if (json.length() > maxObjectLength) { + LOG.warn("Skipped JSON object of size " + json.length() + " at pos " + jsonStart); + } else { + key.set(jsonStart); + value.set(json); + return true; + } + } + + return false; + } + + /* + * {@inheritDoc} + */ + @Override + public LongWritable createKey() { + return new LongWritable(); + } + + /* + * {@inheritDoc} + */ + @Override + public Text createValue() { + return new Text(); + } + + @Override + public long getPos() throws IOException { + return pos; + } + + /* + * {@inheritDoc} + */ + @Override + public synchronized void close() throws IOException { + if (is != null) { + is.close(); + } + } + + /* + * {@inheritDoc} + */ + @Override + public float getProgress() throws IOException { + if (start == end) { + return 0.0f; + } else { + return Math.min(1.0f, (pos - start) / (float) (end - start)); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java new file mode 100644 index 0000000..21db6b7 --- /dev/null +++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonResolver.java @@ -0,0 +1,256 @@ +package org.apache.hawq.pxf.plugins.json; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.ColumnDescriptor; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.api.utilities.Plugin; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.map.ObjectMapper; + +/** + * This JSON resolver for PXF will decode a given object from the {@link JsonAccessor} into a row for HAWQ. It will + * decode this data into a JsonNode and walk the tree for each column. It supports normal value mapping via projections + * and JSON array indexing. + */ +public class JsonResolver extends Plugin implements ReadResolver { + + private static final Log LOG = LogFactory.getLog(JsonResolver.class); + + private ArrayList<OneField> oneFieldList; + private ColumnDescriptorCache[] columnDescriptorCache; + private ObjectMapper mapper; + + /** + * Row with empty fields. Returned in case of broken or malformed json records. + */ + private final List<OneField> emptyRow; + + public JsonResolver(InputData inputData) throws Exception { + super(inputData); + oneFieldList = new ArrayList<OneField>(); + mapper = new ObjectMapper(new JsonFactory()); + + // Precompute the column metadata. The metadata is used for mapping column names to json nodes. + columnDescriptorCache = new ColumnDescriptorCache[inputData.getColumns()]; + for (int i = 0; i < inputData.getColumns(); ++i) { + ColumnDescriptor cd = inputData.getColumn(i); + columnDescriptorCache[i] = new ColumnDescriptorCache(cd); + } + + emptyRow = createEmptyRow(); + } + + @Override + public List<OneField> getFields(OneRow row) throws Exception { + oneFieldList.clear(); + + String jsonRecordAsText = row.getData().toString(); + + JsonNode root = decodeLineToJsonNode(jsonRecordAsText); + + if (root == null) { + LOG.warn("Return empty-fields row due to invalid JSON: " + jsonRecordAsText); + return emptyRow; + } + + // Iterate through the column definition and fetch our JSON data + for (ColumnDescriptorCache columnMetadata : columnDescriptorCache) { + + JsonNode node = getChildJsonNode(root, columnMetadata.getNormalizedProjections()); + + // If this node is null or missing, add a null value here + if (node == null || node.isMissingNode()) { + addNullField(columnMetadata.getColumnType()); + } else if (columnMetadata.isArray()) { + // If this column is an array index, ex. "tweet.hashtags[0]" + if (node.isArray()) { + // If the JSON node is an array, then add it to our list + addFieldFromJsonArray(columnMetadata.getColumnType(), node, columnMetadata.getArrayNodeIndex()); + } else { + throw new IllegalStateException(columnMetadata.getColumnName() + " is not an array node"); + } + } else { + // This column is not an array type + // Add the value to the record + addFieldFromJsonNode(columnMetadata.getColumnType(), node); + } + } + + return oneFieldList; + } + + /** + * @return Returns a row comprised of typed, empty fields. Used as a result of broken/malformed json records. + */ + private List<OneField> createEmptyRow() { + ArrayList<OneField> emptyFieldList = new ArrayList<OneField>(); + for (ColumnDescriptorCache column : columnDescriptorCache) { + emptyFieldList.add(new OneField(column.getColumnType().getOID(), null)); + } + return emptyFieldList; + } + + /** + * Iterates down the root node to the child JSON node defined by the projs path. + * + * @param root + * node to to start the traversal from. + * @param projs + * defines the path from the root to the desired child node. + * @return Returns the child node defined by the root and projs path. + */ + private JsonNode getChildJsonNode(JsonNode root, String[] projs) { + + // Iterate through all the tokens to the desired JSON node + JsonNode node = root; + for (int j = 0; j < projs.length; ++j) { + node = node.path(projs[j]); + } + + return node; + } + + /** + * Iterates through the given JSON node to the proper index and adds the field of corresponding type + * + * @param type + * The {@link DataType} type + * @param node + * The JSON array node + * @param index + * The array index to iterate to + * @throws IOException + */ + private void addFieldFromJsonArray(DataType type, JsonNode node, int index) throws IOException { + + int count = 0; + boolean added = false; + for (Iterator<JsonNode> arrayNodes = node.getElements(); arrayNodes.hasNext();) { + JsonNode arrayNode = arrayNodes.next(); + + if (count == index) { + added = true; + addFieldFromJsonNode(type, arrayNode); + break; + } + + ++count; + } + + // if we reached the end of the array without adding a field, add null + if (!added) { + addNullField(type); + } + } + + /** + * Adds a field from a given JSON node value based on the {@link DataType} type. + * + * @param type + * The DataType type + * @param val + * The JSON node to extract the value. + * @throws IOException + */ + private void addFieldFromJsonNode(DataType type, JsonNode val) throws IOException { + OneField oneField = new OneField(); + oneField.type = type.getOID(); + + if (val.isNull()) { + oneField.val = null; + } else { + switch (type) { + case BIGINT: + oneField.val = val.asLong(); + break; + case BOOLEAN: + oneField.val = val.asBoolean(); + break; + case CHAR: + oneField.val = val.asText().charAt(0); + break; + case BYTEA: + oneField.val = val.asText().getBytes(); + break; + case FLOAT8: + oneField.val = val.asDouble(); + break; + case REAL: + oneField.val = (float)val.asDouble(); + break; + case INTEGER: + oneField.val = val.asInt(); + break; + case SMALLINT: + oneField.val = (short)val.asInt(); + break; + case BPCHAR: + case TEXT: + case VARCHAR: + oneField.val = val.asText(); + break; + default: + throw new IOException("Unsupported type " + type); + } + } + + oneFieldList.add(oneField); + } + + /** + * Adds a null field of the given type. + * + * @param type + * The {@link DataType} type + */ + private void addNullField(DataType type) { + oneFieldList.add(new OneField(type.getOID(), null)); + } + + /** + * Converts the input line parameter into {@link JsonNode} instance. + * + * @param line + * JSON text + * @return Returns a {@link JsonNode} that represents the input line or null for invalid json. + */ + private JsonNode decodeLineToJsonNode(String line) { + + try { + return mapper.readTree(line); + } catch (Exception e) { + LOG.error("Failed to parse JSON object", e); + return null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java new file mode 100644 index 0000000..616312d --- /dev/null +++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexer.java @@ -0,0 +1,175 @@ +package org.apache.hawq.pxf.plugins.json.parser; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + * A very loosey-goosey lexer that doesn't enforce any JSON structural rules + */ +public class JsonLexer { + + /** + * The current Lexer state. + */ + private JsonLexerState state; + + /** + * Create a new lexer with {@link JsonLexerState#NULL} initial state + */ + public JsonLexer() { + this(JsonLexerState.NULL); + } + + /** + * Create a new lexer with initial state + * + * @param initState + * Lexer initial state + */ + public JsonLexer(JsonLexerState initState) { + state = initState; + } + + /** + * @return current lexer state + */ + public JsonLexerState getState() { + return state; + } + + /** + * Change Lexer's {@link #state} + * + * @param state + * New lexer state + */ + public void setState(JsonLexerState state) { + this.state = state; + } + + /** + * Represents the possible states of a cursor can take in a JSON document. + */ + public static enum JsonLexerState { + NULL, + + DONT_CARE, + + BEGIN_OBJECT, + + END_OBJECT, + + BEGIN_STRING, + + END_STRING, + + INSIDE_STRING, + + STRING_ESCAPE, + + VALUE_SEPARATOR, + + NAME_SEPARATOR, + + BEGIN_ARRAY, + + END_ARRAY, + + WHITESPACE + } + + /** + * Given the current lexer state and the next cursor position computes the next {@link #state}. + * + * @param c + * next character the cursor is moved to + */ + public void lex(char c) { + switch (state) { + case NULL: + case BEGIN_OBJECT: + case END_OBJECT: + case BEGIN_ARRAY: + case END_ARRAY: + case END_STRING: + case VALUE_SEPARATOR: + case NAME_SEPARATOR: + case DONT_CARE: + case WHITESPACE: { + if (Character.isWhitespace(c)) { + state = JsonLexerState.WHITESPACE; + break; + } + switch (c) { + // value-separator (comma) + case ',': + state = JsonLexerState.VALUE_SEPARATOR; + break; + // name-separator (colon) + case ':': + state = JsonLexerState.NAME_SEPARATOR; + break; + // string + case '"': + state = JsonLexerState.BEGIN_STRING; + break; + // start-object + case '{': + state = JsonLexerState.BEGIN_OBJECT; + break; + // end-object + case '}': + state = JsonLexerState.END_OBJECT; + break; + // begin-array + case '[': + state = JsonLexerState.BEGIN_ARRAY; + break; + // end-array + case ']': + state = JsonLexerState.END_ARRAY; + break; + default: + state = JsonLexerState.DONT_CARE; + } + break; + } + case BEGIN_STRING: + case INSIDE_STRING: { + state = JsonLexerState.INSIDE_STRING; + // we will now enter the STRING state below + + switch (c) { + // end-string + case '"': + state = JsonLexerState.END_STRING; + break; + // escape + case '\\': + state = JsonLexerState.STRING_ESCAPE; + } + break; + } + case STRING_ESCAPE: { + state = JsonLexerState.INSIDE_STRING; + break; + } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java new file mode 100644 index 0000000..71ad449 --- /dev/null +++ b/pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParser.java @@ -0,0 +1,200 @@ +package org.apache.hawq.pxf.plugins.json.parser; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +import org.apache.hawq.pxf.plugins.json.parser.JsonLexer.JsonLexerState; + +/** + * A simple parser that can support reading JSON objects from a random point in JSON text. It reads from the supplied + * stream (which is assumed to be positioned at any arbitrary position inside some JSON text) until it finds the first + * JSON begin-object "{". From this point on it will keep reading JSON objects until it finds one containing a member + * string that the user supplies. + * <p/> + * It is not recommended to use this with JSON text where individual JSON objects that can be large (MB's or larger). + */ +public class PartitionedJsonParser { + + private static final char BACKSLASH = '\\'; + private static final char START_BRACE = '{'; + private static final int EOF = -1; + private final InputStreamReader inputStreamReader; + private final JsonLexer lexer; + private long bytesRead = 0; + private boolean endOfStream = false; + + public PartitionedJsonParser(InputStream is) { + this.lexer = new JsonLexer(); + + // You need to wrap the InputStream with an InputStreamReader, so that it can encode the incoming byte stream as + // UTF-8 characters + this.inputStreamReader = new InputStreamReader(is, StandardCharsets.UTF_8); + } + + private boolean scanToFirstBeginObject() throws IOException { + // seek until we hit the first begin-object + char prev = ' '; + int i; + while ((i = inputStreamReader.read()) != EOF) { + char c = (char) i; + bytesRead++; + if (c == START_BRACE && prev != BACKSLASH) { + lexer.setState(JsonLexer.JsonLexerState.BEGIN_OBJECT); + return true; + } + prev = c; + } + endOfStream = true; + return false; + } + + private enum MemberSearchState { + FOUND_STRING_NAME, + + SEARCHING, + + IN_MATCHING_OBJECT + } + + private static final EnumSet<JsonLexerState> inStringStates = EnumSet.of(JsonLexerState.INSIDE_STRING, + JsonLexerState.STRING_ESCAPE); + + /** + * @param memberName + * Indicates the member name used to determine the encapsulating object to return. + * @return Returns next json object that contains a member attribute with name: memberName. Returns null if no such + * object is found or the end of the stream is reached. + * @throws IOException + */ + public String nextObjectContainingMember(String memberName) throws IOException { + + if (endOfStream) { + return null; + } + + int i; + int objectCount = 0; + StringBuilder currentObject = new StringBuilder(); + StringBuilder currentString = new StringBuilder(); + MemberSearchState memberState = MemberSearchState.SEARCHING; + + List<Integer> objectStack = new ArrayList<Integer>(); + + if (!scanToFirstBeginObject()) { + return null; + } + currentObject.append(START_BRACE); + objectStack.add(0); + + while ((i = inputStreamReader.read()) != EOF) { + char c = (char) i; + bytesRead++; + + lexer.lex(c); + + currentObject.append(c); + + switch (memberState) { + case SEARCHING: + if (lexer.getState() == JsonLexerState.BEGIN_STRING) { + // we found the start of a string, so reset our string buffer + currentString.setLength(0); + } else if (inStringStates.contains(lexer.getState())) { + // we're still inside a string, so keep appending to our buffer + currentString.append(c); + } else if (lexer.getState() == JsonLexerState.END_STRING && memberName.equals(currentString.toString())) { + + if (objectStack.size() > 0) { + // we hit the end of the string and it matched the member name (yay) + memberState = MemberSearchState.FOUND_STRING_NAME; + currentString.setLength(0); + } + } else if (lexer.getState() == JsonLexerState.BEGIN_OBJECT) { + // we are searching and found a '{', so we reset the current object string + if (objectStack.size() == 0) { + currentObject.setLength(0); + currentObject.append(START_BRACE); + } + objectStack.add(currentObject.length() - 1); + } else if (lexer.getState() == JsonLexerState.END_OBJECT) { + if (objectStack.size() > 0) { + objectStack.remove(objectStack.size() - 1); + } + if (objectStack.size() == 0) { + currentObject.setLength(0); + } + } + break; + case FOUND_STRING_NAME: + // keep popping whitespaces until we hit a different token + if (lexer.getState() != JsonLexerState.WHITESPACE) { + if (lexer.getState() == JsonLexerState.NAME_SEPARATOR) { + // found our member! + memberState = MemberSearchState.IN_MATCHING_OBJECT; + objectCount = 0; + + if (objectStack.size() > 1) { + currentObject.delete(0, objectStack.get(objectStack.size() - 1)); + } + objectStack.clear(); + } else { + // we didn't find a value-separator (:), so our string wasn't a member string. keep searching + memberState = MemberSearchState.SEARCHING; + } + } + break; + case IN_MATCHING_OBJECT: + if (lexer.getState() == JsonLexerState.BEGIN_OBJECT) { + objectCount++; + } else if (lexer.getState() == JsonLexerState.END_OBJECT) { + objectCount--; + if (objectCount < 0) { + // we're done! we reached an "}" which is at the same level as the member we found + return currentObject.toString(); + } + } + break; + } + } + endOfStream = true; + return null; + } + + /** + * @return Returns the number of bytes read from the stream. + */ + public long getBytesRead() { + return bytesRead; + } + + /** + * @return Returns true if the end of the stream has been reached and false otherwise. + */ + public boolean isEndOfStream() { + return endOfStream; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java new file mode 100644 index 0000000..a8161c1 --- /dev/null +++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/JsonExtensionTest.java @@ -0,0 +1,272 @@ +package org.apache.hawq.pxf.plugins.json; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.Path; +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.plugins.hdfs.HdfsDataFragmenter; +import org.apache.hawq.pxf.plugins.json.JsonAccessor; +import org.apache.hawq.pxf.plugins.json.JsonResolver; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class JsonExtensionTest extends PxfUnit { + + private static final String IDENTIFIER = JsonAccessor.IDENTIFIER_PARAM; + private List<Pair<String, DataType>> columnDefs = null; + private List<Pair<String, String>> extraParams = new ArrayList<Pair<String, String>>(); + private List<String> output = new ArrayList<String>(); + + @Before + public void before() { + + columnDefs = new ArrayList<Pair<String, DataType>>(); + + columnDefs.add(new Pair<String, DataType>("created_at", DataType.TEXT)); + columnDefs.add(new Pair<String, DataType>("id", DataType.BIGINT)); + columnDefs.add(new Pair<String, DataType>("text", DataType.TEXT)); + columnDefs.add(new Pair<String, DataType>("user.screen_name", DataType.TEXT)); + columnDefs.add(new Pair<String, DataType>("entities.hashtags[0]", DataType.TEXT)); + columnDefs.add(new Pair<String, DataType>("coordinates.coordinates[0]", DataType.FLOAT8)); + columnDefs.add(new Pair<String, DataType>("coordinates.coordinates[1]", DataType.FLOAT8)); + + output.clear(); + extraParams.clear(); + } + + @After + public void cleanup() throws Exception { + columnDefs.clear(); + } + + @Test + public void testCompressedMultilineJsonFile() throws Exception { + + extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at")); + + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/tweets.tar.gz"), output); + } + + @Test + public void testMaxRecordLength() throws Exception { + + // variable-size-objects.json contains 3 json objects but only 2 of them fit in the 27 byte length limitation + + extraParams.add(new Pair<String, String>(IDENTIFIER, "key666")); + extraParams.add(new Pair<String, String>("MAXLENGTH", "27")); + + columnDefs.clear(); + columnDefs.add(new Pair<String, DataType>("key666", DataType.TEXT)); + + output.add("small object1"); + // skip the large object2 XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX + output.add("small object3"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/variable-size-objects.json"), output); + } + + @Test + public void testDataTypes() throws Exception { + + // TDOO: The BYTEA type is not tested. The implementation (val.asText().getBytes()) returns an array reference + // and it is not clear whether this is the desired behavior. + // + // For the time being avoid using BYTEA type!!! + + // This test also verifies that the order of the columns in the table definition agnostic to the order of the + // json attributes. + + extraParams.add(new Pair<String, String>(IDENTIFIER, "bintType")); + + columnDefs.clear(); + + columnDefs.add(new Pair<String, DataType>("text", DataType.TEXT)); + columnDefs.add(new Pair<String, DataType>("varcharType", DataType.VARCHAR)); + columnDefs.add(new Pair<String, DataType>("bpcharType", DataType.BPCHAR)); + columnDefs.add(new Pair<String, DataType>("smallintType", DataType.SMALLINT)); + columnDefs.add(new Pair<String, DataType>("integerType", DataType.INTEGER)); + columnDefs.add(new Pair<String, DataType>("realType", DataType.REAL)); + columnDefs.add(new Pair<String, DataType>("float8Type", DataType.FLOAT8)); + // The DataType.BYTEA type is left out for further validation. + columnDefs.add(new Pair<String, DataType>("charType", DataType.CHAR)); + columnDefs.add(new Pair<String, DataType>("booleanType", DataType.BOOLEAN)); + columnDefs.add(new Pair<String, DataType>("bintType", DataType.BIGINT)); + + output.add(",varcharType,bpcharType,777,999,3.15,3.14,x,true,666"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/datatypes-test.json"), output); + } + + @Test(expected = IllegalStateException.class) + public void testMissingArrayJsonAttribute() throws Exception { + + extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at")); + + columnDefs.clear(); + + columnDefs.add(new Pair<String, DataType>("created_at", DataType.TEXT)); + // User is not an array! An attempt to access it should throw an exception! + columnDefs.add(new Pair<String, DataType>("user[0]", DataType.TEXT)); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/tweets-with-missing-text-attribtute.json"), output); + } + + @Test + public void testMissingJsonAttribute() throws Exception { + + extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at")); + + // Missing attributes are substituted by an empty field + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,,SpreadButter,tweetCongress,,"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/tweets-with-missing-text-attribtute.json"), output); + } + + @Test + public void testMalformedJsonObject() throws Exception { + + extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at")); + + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,"); + output.add(",,,,,,"); // Expected: malformed json records are transformed into empty rows + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/tweets-broken.json"), output); + } + + @Test + public void testSmallTweets() throws Exception { + + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,"); + output.add("Fri Jun 07 22:45:03 +0000 2013,343136551322136576,text4,SevenStonesBuoy,,-6.1,50.103"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/tweets-small.json"), output); + } + + @Test + public void testTweetsWithNull() throws Exception { + + output.add("Fri Jun 07 22:45:02 +0000 2013,,text1,SpreadButter,tweetCongress,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,,text2,patronusdeadly,,,"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/null-tweets.json"), output); + } + + @Test + public void testSmallTweetsWithDelete() throws Exception { + + output.add(",,,,,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/tweets-small-with-delete.json"), output); + } + + @Test + public void testWellFormedJson() throws Exception { + + extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at")); + + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/tweets-pp.json"), output); + } + + @Test + public void testWellFormedJsonWithDelete() throws Exception { + + extraParams.add(new Pair<String, String>(IDENTIFIER, "created_at")); + + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,"); + + super.assertOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/tweets-pp-with-delete.json"), output); + } + + @Test + public void testMultipleFiles() throws Exception { + + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,"); + output.add("Fri Jun 07 22:45:03 +0000 2013,343136551322136576,text4,SevenStonesBuoy,,-6.1,50.103"); + output.add(",,,,,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547115253761,text1,SpreadButter,tweetCongress,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547123646465,text2,patronusdeadly,,,"); + output.add("Fri Jun 07 22:45:02 +0000 2013,343136547136233472,text3,NoSecrets_Vagas,,,"); + + super.assertUnorderedOutput(new Path(System.getProperty("user.dir") + File.separator + + "src/test/resources/tweets-small*.json"), output); + } + + @Override + public List<Pair<String, String>> getExtraParams() { + return extraParams; + } + + @Override + public Class<? extends Fragmenter> getFragmenterClass() { + return HdfsDataFragmenter.class; + } + + @Override + public Class<? extends ReadAccessor> getReadAccessorClass() { + return JsonAccessor.class; + } + + @Override + public Class<? extends ReadResolver> getReadResolverClass() { + return JsonResolver.class; + } + + @Override + public List<Pair<String, DataType>> getColumnDefinitions() { + return columnDefs; + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java new file mode 100644 index 0000000..5669882 --- /dev/null +++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/PxfUnit.java @@ -0,0 +1,666 @@ +package org.apache.hawq.pxf.plugins.json; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.lang.reflect.Constructor; +import java.security.InvalidParameterException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hawq.pxf.api.Fragment; +import org.apache.hawq.pxf.api.Fragmenter; +import org.apache.hawq.pxf.api.OneField; +import org.apache.hawq.pxf.api.OneRow; +import org.apache.hawq.pxf.api.ReadAccessor; +import org.apache.hawq.pxf.api.ReadResolver; +import org.apache.hawq.pxf.api.WriteAccessor; +import org.apache.hawq.pxf.api.WriteResolver; +import org.apache.hawq.pxf.api.io.DataType; +import org.apache.hawq.pxf.api.utilities.InputData; +import org.apache.hawq.pxf.service.FragmentsResponse; +import org.apache.hawq.pxf.service.FragmentsResponseFormatter; +import org.apache.hawq.pxf.service.utilities.ProtocolData; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonNode; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.Assert; + +/** + * This abstract class contains a number of helpful utilities in developing a PXF extension for HAWQ. Extend this class + * and use the various <code>assert</code> methods to check given input against known output. + */ +public abstract class PxfUnit { + + private static final Log LOG = LogFactory.getLog(PxfUnit.class); + + private static JsonFactory factory = new JsonFactory(); + private static ObjectMapper mapper = new ObjectMapper(factory); + + protected static List<InputData> inputs = null; + + /** + * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the file for + * output testing. + * + * @param input + * Input records + * @param expectedOutput + * File containing output to check + * @throws Exception + */ + public void assertOutput(Path input, Path expectedOutput) throws Exception { + + BufferedReader rdr = new BufferedReader(new InputStreamReader(FileSystem.get(new Configuration()).open( + expectedOutput))); + + List<String> outputLines = new ArrayList<String>(); + + String line; + while ((line = rdr.readLine()) != null) { + outputLines.add(line); + } + + assertOutput(input, outputLines); + + rdr.close(); + } + + /** + * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the given + * parameter for output testing. + * + * @param input + * Input records + * @param expectedOutput + * File containing output to check + * @throws Exception + */ + public void assertOutput(Path input, List<String> expectedOutput) throws Exception { + + setup(input); + List<String> actualOutput = new ArrayList<String>(); + for (InputData data : inputs) { + ReadAccessor accessor = getReadAccessor(data); + ReadResolver resolver = getReadResolver(data); + + actualOutput.addAll(getAllOutput(accessor, resolver)); + } + + Assert.assertFalse("Output did not match expected output", compareOutput(expectedOutput, actualOutput)); + } + + /** + * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the given + * parameter for output testing.<br> + * <br> + * Ignores order of records. + * + * @param input + * Input records + * @param expectedOutput + * File containing output to check + * @throws Exception + */ + public void assertUnorderedOutput(Path input, Path expectedOutput) throws Exception { + BufferedReader rdr = new BufferedReader(new InputStreamReader(FileSystem.get(new Configuration()).open( + expectedOutput))); + + List<String> outputLines = new ArrayList<String>(); + + String line; + while ((line = rdr.readLine()) != null) { + outputLines.add(line); + } + + assertUnorderedOutput(input, outputLines); + rdr.close(); + } + + /** + * Uses the given input directory to run through the PXF unit testing framework. Uses the lines in the file for + * output testing.<br> + * <br> + * Ignores order of records. + * + * @param input + * Input records + * @param expectedOutput + * File containing output to check + * @throws Exception + */ + public void assertUnorderedOutput(Path input, List<String> expectedOutput) throws Exception { + + setup(input); + + List<String> actualOutput = new ArrayList<String>(); + for (InputData data : inputs) { + ReadAccessor accessor = getReadAccessor(data); + ReadResolver resolver = getReadResolver(data); + + actualOutput.addAll(getAllOutput(accessor, resolver)); + } + + Assert.assertFalse("Output did not match expected output", compareUnorderedOutput(expectedOutput, actualOutput)); + } + + /** + * Writes the output to the given output stream. Comma delimiter. + * + * @param input + * The input file + * @param output + * The output stream + * @throws Exception + */ + public void writeOutput(Path input, OutputStream output) throws Exception { + + setup(input); + + for (InputData data : inputs) { + ReadAccessor accessor = getReadAccessor(data); + ReadResolver resolver = getReadResolver(data); + + for (String line : getAllOutput(accessor, resolver)) { + output.write((line + "\n").getBytes()); + } + } + + output.flush(); + } + + /** + * Get the class of the implementation of Fragmenter to be tested. + * + * @return The class + */ + public Class<? extends Fragmenter> getFragmenterClass() { + return null; + } + + /** + * Get the class of the implementation of ReadAccessor to be tested. + * + * @return The class + */ + public Class<? extends ReadAccessor> getReadAccessorClass() { + return null; + } + + /** + * Get the class of the implementation of WriteAccessor to be tested. + * + * @return The class + */ + public Class<? extends WriteAccessor> getWriteAccessorClass() { + return null; + } + + /** + * Get the class of the implementation of Resolver to be tested. + * + * @return The class + */ + public Class<? extends ReadResolver> getReadResolverClass() { + return null; + } + + /** + * Get the class of the implementation of WriteResolver to be tested. + * + * @return The class + */ + public Class<? extends WriteResolver> getWriteResolverClass() { + return null; + } + + /** + * Get any extra parameters that are meant to be specified for the "pxf" protocol. Note that "X-GP-" is prepended to + * each parameter name. + * + * @return Any extra parameters or null if none. + */ + public List<Pair<String, String>> getExtraParams() { + return null; + } + + /** + * Gets the column definition names and data types. Types are DataType objects + * + * @return A list of column definition name value pairs. Cannot be null. + */ + public abstract List<Pair<String, DataType>> getColumnDefinitions(); + + protected InputData getInputDataForWritableTable() { + return getInputDataForWritableTable(null); + } + + protected InputData getInputDataForWritableTable(Path input) { + + if (getWriteAccessorClass() == null) { + throw new IllegalArgumentException( + "getWriteAccessorClass() must be overwritten to return a non-null object"); + } + + if (getWriteResolverClass() == null) { + throw new IllegalArgumentException( + "getWriteResolverClass() must be overwritten to return a non-null object"); + } + + Map<String, String> paramsMap = new HashMap<String, String>(); + + paramsMap.put("X-GP-ALIGNMENT", "what"); + paramsMap.put("X-GP-SEGMENT-ID", "1"); + paramsMap.put("X-GP-HAS-FILTER", "0"); + paramsMap.put("X-GP-SEGMENT-COUNT", "1"); + + paramsMap.put("X-GP-FORMAT", "GPDBWritable"); + paramsMap.put("X-GP-URL-HOST", "localhost"); + paramsMap.put("X-GP-URL-PORT", "50070"); + + if (input == null) { + paramsMap.put("X-GP-DATA-DIR", "/dummydata"); + } + + List<Pair<String, DataType>> params = getColumnDefinitions(); + paramsMap.put("X-GP-ATTRS", Integer.toString(params.size())); + for (int i = 0; i < params.size(); ++i) { + paramsMap.put("X-GP-ATTR-NAME" + i, params.get(i).first); + paramsMap.put("X-GP-ATTR-TYPENAME" + i, params.get(i).second.name()); + paramsMap.put("X-GP-ATTR-TYPECODE" + i, Integer.toString(params.get(i).second.getOID())); + } + + paramsMap.put("X-GP-ACCESSOR", getWriteAccessorClass().getName()); + paramsMap.put("X-GP-RESOLVER", getWriteResolverClass().getName()); + + if (getExtraParams() != null) { + for (Pair<String, String> param : getExtraParams()) { + paramsMap.put("X-GP-" + param.first, param.second); + } + } + + return new ProtocolData(paramsMap); + } + + /** + * Set all necessary parameters for GPXF framework to function. Uses the given path as a single input split. + * + * @param input + * The input path, relative or absolute. + * @throws Exception + */ + protected void setup(Path input) throws Exception { + + if (getFragmenterClass() == null) { + throw new IllegalArgumentException("getFragmenterClass() must be overwritten to return a non-null object"); + } + + if (getReadAccessorClass() == null) { + throw new IllegalArgumentException("getReadAccessorClass() must be overwritten to return a non-null object"); + } + + if (getReadResolverClass() == null) { + throw new IllegalArgumentException("getReadResolverClass() must be overwritten to return a non-null object"); + } + + Map<String, String> paramsMap = new HashMap<String, String>(); + + // 2.1.0 Properties + // HDMetaData parameters + paramsMap.put("X-GP-ALIGNMENT", "what"); + paramsMap.put("X-GP-SEGMENT-ID", "1"); + paramsMap.put("X-GP-HAS-FILTER", "0"); + paramsMap.put("X-GP-SEGMENT-COUNT", "1"); + paramsMap.put("X-GP-FRAGMENTER", getFragmenterClass().getName()); + paramsMap.put("X-GP-FORMAT", "GPDBWritable"); + paramsMap.put("X-GP-URL-HOST", "localhost"); + paramsMap.put("X-GP-URL-PORT", "50070"); + + paramsMap.put("X-GP-DATA-DIR", input.toString()); + + List<Pair<String, DataType>> params = getColumnDefinitions(); + paramsMap.put("X-GP-ATTRS", Integer.toString(params.size())); + for (int i = 0; i < params.size(); ++i) { + paramsMap.put("X-GP-ATTR-NAME" + i, params.get(i).first); + paramsMap.put("X-GP-ATTR-TYPENAME" + i, params.get(i).second.name()); + paramsMap.put("X-GP-ATTR-TYPECODE" + i, Integer.toString(params.get(i).second.getOID())); + } + + // HDFSMetaData properties + paramsMap.put("X-GP-ACCESSOR", getReadAccessorClass().getName()); + paramsMap.put("X-GP-RESOLVER", getReadResolverClass().getName()); + + if (getExtraParams() != null) { + for (Pair<String, String> param : getExtraParams()) { + paramsMap.put("X-GP-" + param.first, param.second); + } + } + + LocalInputData fragmentInputData = new LocalInputData(paramsMap); + + List<Fragment> fragments = getFragmenter(fragmentInputData).getFragments(); + + FragmentsResponse fragmentsResponse = FragmentsResponseFormatter.formatResponse(fragments, input.toString()); + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + fragmentsResponse.write(baos); + + String jsonOutput = baos.toString(); + + inputs = new ArrayList<InputData>(); + + JsonNode node = decodeLineToJsonNode(jsonOutput); + + JsonNode fragmentsArray = node.get("PXFFragments"); + int i = 0; + Iterator<JsonNode> iter = fragmentsArray.getElements(); + while (iter.hasNext()) { + JsonNode fragNode = iter.next(); + String sourceData = fragNode.get("sourceName").getTextValue(); + if (!sourceData.startsWith("/")) { + sourceData = "/" + sourceData; + } + paramsMap.put("X-GP-DATA-DIR", sourceData); + paramsMap.put("X-GP-FRAGMENT-METADATA", fragNode.get("metadata").getTextValue()); + paramsMap.put("X-GP-DATA-FRAGMENT", Integer.toString(i++)); + inputs.add(new LocalInputData(paramsMap)); + } + } + + private JsonNode decodeLineToJsonNode(String line) { + try { + return mapper.readTree(line); + } catch (Exception e) { + LOG.warn(e); + return null; + } + } + + /** + * Compares the expected and actual output, printing out any errors. + * + * @param expectedOutput + * The expected output + * @param actualOutput + * The actual output + * @return True if no errors, false otherwise. + */ + protected boolean compareOutput(List<String> expectedOutput, List<String> actualOutput) { + + boolean error = false; + for (int i = 0; i < expectedOutput.size(); ++i) { + boolean match = false; + for (int j = 0; j < actualOutput.size(); ++j) { + if (expectedOutput.get(i).equals(actualOutput.get(j))) { + match = true; + if (i != j) { + LOG.error("Expected (" + expectedOutput.get(i) + ") matched (" + actualOutput.get(j) + + ") but in wrong place. " + j + " instead of " + i); + error = true; + } + + break; + } + } + + if (!match) { + LOG.error("Missing expected output: (" + expectedOutput.get(i) + ")"); + error = true; + } + } + + for (int i = 0; i < actualOutput.size(); ++i) { + boolean match = false; + for (int j = 0; j < expectedOutput.size(); ++j) { + if (actualOutput.get(i).equals(expectedOutput.get(j))) { + match = true; + break; + } + } + + if (!match) { + LOG.error("Received unexpected output: (" + actualOutput.get(i) + ")"); + error = true; + } + } + + return error; + } + + /** + * Compares the expected and actual output, printing out any errors. + * + * @param expectedOutput + * The expected output + * @param actualOutput + * The actual output + * @return True if no errors, false otherwise. + */ + protected boolean compareUnorderedOutput(List<String> expectedOutput, List<String> actualOutput) { + + boolean error = false; + for (int i = 0; i < expectedOutput.size(); ++i) { + boolean match = false; + for (int j = 0; j < actualOutput.size(); ++j) { + if (expectedOutput.get(i).equals(actualOutput.get(j))) { + match = true; + break; + } + } + + if (!match) { + LOG.error("Missing expected output: (" + expectedOutput.get(i) + ")"); + error = true; + } + } + + for (int i = 0; i < actualOutput.size(); ++i) { + boolean match = false; + for (int j = 0; j < expectedOutput.size(); ++j) { + if (actualOutput.get(i).equals(expectedOutput.get(j))) { + match = true; + break; + } + } + + if (!match) { + LOG.error("Received unexpected output: (" + actualOutput.get(i) + ")"); + error = true; + } + } + + return error; + } + + /** + * Opens the accessor and reads all output, giving it to the resolver to retrieve the list of fields. These fields + * are then added to a string, delimited by commas, and returned in a list. + * + * @param accessor + * The accessor instance to use + * @param resolver + * The resolver instance to use + * @return The list of output strings + * @throws Exception + */ + protected List<String> getAllOutput(ReadAccessor accessor, ReadResolver resolver) throws Exception { + + Assert.assertTrue("Accessor failed to open", accessor.openForRead()); + + List<String> output = new ArrayList<String>(); + + OneRow row = null; + while ((row = accessor.readNextObject()) != null) { + + StringBuilder bldr = new StringBuilder(); + for (OneField field : resolver.getFields(row)) { + bldr.append((field != null && field.val != null ? field.val : "") + ","); + } + + if (bldr.length() > 0) { + bldr.deleteCharAt(bldr.length() - 1); + } + + output.add(bldr.toString()); + } + + accessor.closeForRead(); + + return output; + } + + /** + * Gets an instance of Fragmenter via reflection. + * + * Searches for a constructor that has a single parameter of some BaseMetaData type + * + * @return A Fragmenter instance + * @throws Exception + * If something bad happens + */ + protected Fragmenter getFragmenter(InputData meta) throws Exception { + + Fragmenter fragmenter = null; + + for (Constructor<?> c : getFragmenterClass().getConstructors()) { + if (c.getParameterTypes().length == 1) { + for (Class<?> clazz : c.getParameterTypes()) { + if (InputData.class.isAssignableFrom(clazz)) { + fragmenter = (Fragmenter) c.newInstance(meta); + } + } + } + } + + if (fragmenter == null) { + throw new InvalidParameterException("Unable to find Fragmenter constructor with a BaseMetaData parameter"); + } + + return fragmenter; + + } + + /** + * Gets an instance of ReadAccessor via reflection. + * + * Searches for a constructor that has a single parameter of some InputData type + * + * @return An ReadAccessor instance + * @throws Exception + * If something bad happens + */ + protected ReadAccessor getReadAccessor(InputData data) throws Exception { + + ReadAccessor accessor = null; + + for (Constructor<?> c : getReadAccessorClass().getConstructors()) { + if (c.getParameterTypes().length == 1) { + for (Class<?> clazz : c.getParameterTypes()) { + if (InputData.class.isAssignableFrom(clazz)) { + accessor = (ReadAccessor) c.newInstance(data); + } + } + } + } + + if (accessor == null) { + throw new InvalidParameterException("Unable to find Accessor constructor with a BaseMetaData parameter"); + } + + return accessor; + + } + + /** + * Gets an instance of IFieldsResolver via reflection. + * + * Searches for a constructor that has a single parameter of some BaseMetaData type + * + * @return A IFieldsResolver instance + * @throws Exception + * If something bad happens + */ + protected ReadResolver getReadResolver(InputData data) throws Exception { + + ReadResolver resolver = null; + + // search for a constructor that has a single parameter of a type of + // BaseMetaData to create the accessor instance + for (Constructor<?> c : getReadResolverClass().getConstructors()) { + if (c.getParameterTypes().length == 1) { + for (Class<?> clazz : c.getParameterTypes()) { + if (InputData.class.isAssignableFrom(clazz)) { + resolver = (ReadResolver) c.newInstance(data); + } + } + } + } + + if (resolver == null) { + throw new InvalidParameterException("Unable to find Resolver constructor with a BaseMetaData parameter"); + } + + return resolver; + } + + public static class Pair<FIRST, SECOND> { + + public FIRST first; + public SECOND second; + + public Pair() { + } + + public Pair(FIRST f, SECOND s) { + this.first = f; + this.second = s; + } + } + + /** + * An extension of InputData for the local file system instead of HDFS. Leveraged by the PXFUnit framework. Do not + * concern yourself with such a simple piece of code. + */ + public static class LocalInputData extends ProtocolData { + + public LocalInputData(ProtocolData copy) { + super(copy); + super.setDataSource(super.getDataSource().substring(1)); + } + + public LocalInputData(Map<String, String> paramsMap) { + super(paramsMap); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java new file mode 100644 index 0000000..aa8ea14 --- /dev/null +++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/JsonLexerTest.java @@ -0,0 +1,141 @@ +package org.apache.hawq.pxf.plugins.json.parser; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; +import java.util.List; +import java.util.ListIterator; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; + +public class JsonLexerTest { + + private static final Log LOG = LogFactory.getLog(JsonLexerTest.class); + + @Test + public void testSimple() throws IOException { + File testsDir = new File("src/test/resources/lexer-tests"); + File[] jsonFiles = testsDir.listFiles(new FilenameFilter() { + public boolean accept(File file, String s) { + return s.endsWith(".json"); + } + }); + + for (File jsonFile : jsonFiles) { + File stateFile = new File(jsonFile.getAbsolutePath() + ".state"); + if (stateFile.exists()) { + runTest(jsonFile, stateFile); + } + } + } + + public static Pattern STATE_RECURRENCE = Pattern.compile("^([A-Za-z\\_0-9]+)\\{([0-9]+)\\}$"); + + public void runTest(File jsonFile, File stateFile) throws IOException { + List<String> lexerStates = FileUtils.readLines(stateFile); + InputStream jsonInputStream = new FileInputStream(jsonFile); + + try { + JsonLexer lexer = new JsonLexer(); + + int byteOffset = 0; + int i; + ListIterator<String> stateIterator = lexerStates.listIterator(); + int recurrence = 0; + JsonLexer.JsonLexerState expectedState = null; + StringBuilder sb = new StringBuilder(); + int stateFileLineNum = 0; + while ((i = jsonInputStream.read()) != -1) { + byteOffset++; + char c = (char) i; + + sb.append(c); + + lexer.lex(c); + + if (lexer.getState() == JsonLexer.JsonLexerState.WHITESPACE) { + // optimization to skip over multiple whitespaces + continue; + } + + if (!stateIterator.hasNext()) { + assertFalse(formatStateInfo(jsonFile, sb.toString(), byteOffset, stateFileLineNum) + + ": Input stream had character '" + c + "' but no matching state", true); + } + + if (recurrence <= 0) { + String state = stateIterator.next().trim(); + stateFileLineNum++; + + while (state.equals("") || state.startsWith("#")) { + if (!stateIterator.hasNext()) { + assertFalse(formatStateInfo(jsonFile, sb.toString(), byteOffset, stateFileLineNum) + + ": Input stream had character '" + c + "' but no matching state", true); + } + state = stateIterator.next().trim(); + stateFileLineNum++; + } + + Matcher m = STATE_RECURRENCE.matcher(state); + recurrence = 1; + if (m.matches()) { + state = m.group(1); + recurrence = Integer.valueOf(m.group(2)); + } + expectedState = JsonLexer.JsonLexerState.valueOf(state); + } + + assertEquals(formatStateInfo(jsonFile, sb.toString(), byteOffset, stateFileLineNum) + + ": Issue for char '" + c + "'", expectedState, lexer.getState()); + recurrence--; + } + + if (stateIterator.hasNext()) { + assertFalse(formatStateInfo(jsonFile, sb.toString(), byteOffset, stateFileLineNum) + + ": Input stream has ended but more states were expected: '" + stateIterator.next() + "...'", + true); + } + + } finally { + IOUtils.closeQuietly(jsonInputStream); + } + + LOG.info("File " + jsonFile.getName() + " passed"); + + } + + static String formatStateInfo(File jsonFile, String streamContents, int streamByteOffset, int stateFileLineNum) { + return jsonFile.getName() + ": Input stream currently at byte-offset " + streamByteOffset + ", contents = '" + + streamContents + "'" + " state-file line = " + stateFileLineNum; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java new file mode 100644 index 0000000..cdc876b --- /dev/null +++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserNoSeekTest.java @@ -0,0 +1,83 @@ +package org.apache.hawq.pxf.plugins.json.parser; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FilenameFilter; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.junit.Test; + +public class PartitionedJsonParserNoSeekTest { + + private static final Log LOG = LogFactory.getLog(PartitionedJsonParserNoSeekTest.class); + + @Test + public void testNoSeek() throws IOException { + File testsDir = new File("src/test/resources/parser-tests/noseek"); + File[] jsonFiles = testsDir.listFiles(new FilenameFilter() { + public boolean accept(File file, String s) { + return s.endsWith(".json"); + } + }); + + for (File jsonFile : jsonFiles) { + runTest(jsonFile); + } + } + + public void runTest(final File jsonFile) throws IOException { + InputStream jsonInputStream = new FileInputStream(jsonFile); + + try { + PartitionedJsonParser parser = new PartitionedJsonParser(jsonInputStream); + + File[] jsonOjbectFiles = jsonFile.getParentFile().listFiles(new FilenameFilter() { + public boolean accept(File file, String s) { + return s.contains(jsonFile.getName()) && s.contains("expected"); + } + }); + + for (File jsonObjectFile : jsonOjbectFiles) { + String expected = trimWhitespaces(FileUtils.readFileToString(jsonObjectFile)); + String result = parser.nextObjectContainingMember("name"); + assertNotNull(jsonFile.getName() + "/" + jsonObjectFile.getName(), result); + assertEquals(jsonFile.getName() + "/" + jsonObjectFile.getName(), expected, trimWhitespaces(result)); + LOG.info("File " + jsonFile.getName() + "/" + jsonObjectFile.getName() + " passed"); + } + + } finally { + IOUtils.closeQuietly(jsonInputStream); + } + } + + public String trimWhitespaces(String s) { + return s.replaceAll("[\\n\\t\\r \\t]+", " ").trim(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/fd9c3686/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java ---------------------------------------------------------------------- diff --git a/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java new file mode 100644 index 0000000..3a1b3b6 --- /dev/null +++ b/pxf/pxf-json/src/test/java/org/apache/hawq/pxf/plugins/json/parser/PartitionedJsonParserOffsetTest.java @@ -0,0 +1,55 @@ +package org.apache.hawq.pxf.plugins.json.parser; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.junit.Test; + +public class PartitionedJsonParserOffsetTest { + /* + * [{"color": "red","v": "vv"},{"color": "red","v": "vv"}] + */ + public static String json2 = "[{\"color\": \"red\",\"v\": \"vv\"},{\"color\": \"red\",\"v\": \"vv\"}]"; + + @Test + public void testOffset() throws IOException { + InputStream jsonInputStream = createFromString(json2); + PartitionedJsonParser parser = new PartitionedJsonParser(jsonInputStream); + String result = parser.nextObjectContainingMember("color"); + assertNotNull(result); + assertEquals(27, parser.getBytesRead()); + assertEquals(1, parser.getBytesRead() - result.length()); + result = parser.nextObjectContainingMember("color"); + assertNotNull(result); + assertEquals(54, parser.getBytesRead()); + assertEquals(28, parser.getBytesRead() - result.length()); + jsonInputStream.close(); + } + + public InputStream createFromString(String s) { + return new ByteArrayInputStream(s.getBytes()); + } +}
