[
https://issues.apache.org/jira/browse/HAWQ-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15124334#comment-15124334
]
ASF GitHub Bot commented on HAWQ-178:
-------------------------------------
Github user hornn commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/302#discussion_r51324302
--- Diff:
pxf/pxf-json/src/test/java/org/apache/pxf/hawq/plugins/json/PxfUnit.java ---
@@ -0,0 +1,666 @@
+package org.apache.pxf.hawq.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.BufferedReader;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.lang.reflect.Constructor;
+import java.security.InvalidParameterException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hawq.pxf.api.Fragment;
+import org.apache.hawq.pxf.api.Fragmenter;
+import org.apache.hawq.pxf.api.OneField;
+import org.apache.hawq.pxf.api.OneRow;
+import org.apache.hawq.pxf.api.ReadAccessor;
+import org.apache.hawq.pxf.api.ReadResolver;
+import org.apache.hawq.pxf.api.WriteAccessor;
+import org.apache.hawq.pxf.api.WriteResolver;
+import org.apache.hawq.pxf.api.io.DataType;
+import org.apache.hawq.pxf.api.utilities.InputData;
+import org.apache.hawq.pxf.service.FragmentsResponse;
+import org.apache.hawq.pxf.service.FragmentsResponseFormatter;
+import org.apache.hawq.pxf.service.utilities.ProtocolData;
+import org.codehaus.jackson.JsonFactory;
+import org.codehaus.jackson.JsonNode;
+import org.codehaus.jackson.JsonParseException;
+import org.codehaus.jackson.map.JsonMappingException;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Assert;
+
+/**
+ * This abstract class contains a number of helpful utilities in
developing a PXF extension for HAWQ. Extend this class
+ * and use the various <code>assert</code> methods to check given input
against known output.
+ */
+public abstract class PxfUnit {
+
+ private static JsonFactory factory = new JsonFactory();
+ private static ObjectMapper mapper = new ObjectMapper(factory);
+
+ protected static List<InputData> inputs = null;
+
+ /**
+ * Uses the given input directory to run through the PXF unit testing
framework. Uses the lines in the file for
+ * output testing.
+ *
+ * @param input
+ * Input records
+ * @param expectedOutput
+ * File containing output to check
+ * @throws Exception
+ */
+ public void assertOutput(Path input, Path expectedOutput) throws
Exception {
+
+ BufferedReader rdr = new BufferedReader(new
InputStreamReader(FileSystem.get(new Configuration()).open(
+ expectedOutput)));
+
+ List<String> outputLines = new ArrayList<String>();
+
+ String line;
+ while ((line = rdr.readLine()) != null) {
+ outputLines.add(line);
+ }
+
+ assertOutput(input, outputLines);
+ }
+
+ /**
+ * Uses the given input directory to run through the PXF unit testing
framework. Uses the lines in the given
+ * parameter for output testing.
+ *
+ * @param input
+ * Input records
+ * @param expectedOutput
+ * File containing output to check
+ * @throws Exception
+ */
+ public void assertOutput(Path input, List<String> expectedOutput)
throws Exception {
+
+ setup(input);
+ List<String> actualOutput = new ArrayList<String>();
+ for (InputData data : inputs) {
+ ReadAccessor accessor = getReadAccessor(data);
+ ReadResolver resolver = getReadResolver(data);
+
+ actualOutput.addAll(getAllOutput(accessor, resolver));
+ }
+
+ Assert.assertFalse("Output did not match expected output",
compareOutput(expectedOutput, actualOutput));
+ }
+
+ /**
+ * Uses the given input directory to run through the PXF unit testing
framework. Uses the lines in the given
+ * parameter for output testing.<br>
+ * <br>
+ * Ignores order of records.
+ *
+ * @param input
+ * Input records
+ * @param expectedOutput
+ * File containing output to check
+ * @throws Exception
+ */
+ public void assertUnorderedOutput(Path input, Path expectedOutput)
throws Exception {
+ BufferedReader rdr = new BufferedReader(new
InputStreamReader(FileSystem.get(new Configuration()).open(
+ expectedOutput)));
+
+ List<String> outputLines = new ArrayList<String>();
+
+ String line;
+ while ((line = rdr.readLine()) != null) {
+ outputLines.add(line);
+ }
+
+ assertUnorderedOutput(input, outputLines);
+ }
+
+ /**
+ * Uses the given input directory to run through the PXF unit testing
framework. Uses the lines in the file for
+ * output testing.<br>
+ * <br>
+ * Ignores order of records.
+ *
+ * @param input
+ * Input records
+ * @param expectedOutput
+ * File containing output to check
+ * @throws Exception
+ */
+ public void assertUnorderedOutput(Path input, List<String>
expectedOutput) throws Exception {
+
+ setup(input);
+
+ List<String> actualOutput = new ArrayList<String>();
+ for (InputData data : inputs) {
+ ReadAccessor accessor = getReadAccessor(data);
+ ReadResolver resolver = getReadResolver(data);
+
+ actualOutput.addAll(getAllOutput(accessor, resolver));
+ }
+
+ Assert.assertFalse("Output did not match expected output",
compareUnorderedOutput(expectedOutput, actualOutput));
+ }
+
+ /**
+ * Writes the output to the given output stream. Comma delimiter.
+ *
+ * @param input
+ * The input file
+ * @param output
+ * The output stream
+ * @throws Exception
+ */
+ public void writeOutput(Path input, OutputStream output) throws
Exception {
+
+ setup(input);
+
+ for (InputData data : inputs) {
+ ReadAccessor accessor = getReadAccessor(data);
+ ReadResolver resolver = getReadResolver(data);
+
+ for (String line : getAllOutput(accessor, resolver)) {
+ output.write((line + "\n").getBytes());
+ }
+ }
+
+ output.flush();
+ }
+
+ /**
+ * Get the class of the implementation of Fragmenter to be tested.
+ *
+ * @return The class
+ */
+ public Class<? extends Fragmenter> getFragmenterClass() {
+ return null;
+ }
+
+ /**
+ * Get the class of the implementation of ReadAccessor to be tested.
+ *
+ * @return The class
+ */
+ public Class<? extends ReadAccessor> getReadAccessorClass() {
+ return null;
+ }
+
+ /**
+ * Get the class of the implementation of WriteAccessor to be tested.
+ *
+ * @return The class
+ */
+ public Class<? extends WriteAccessor> getWriteAccessorClass() {
+ return null;
+ }
+
+ /**
+ * Get the class of the implementation of Resolver to be tested.
+ *
+ * @return The class
+ */
+ public Class<? extends ReadResolver> getReadResolverClass() {
+ return null;
+ }
+
+ /**
+ * Get the class of the implementation of WriteResolver to be tested.
+ *
+ * @return The class
+ */
+ public Class<? extends WriteResolver> getWriteResolverClass() {
+ return null;
+ }
+
+ /**
+ * Get any extra parameters that are meant to be specified for the
"pxf" protocol. Note that "X-GP-" is prepended to
+ * each parameter name.
+ *
+ * @return Any extra parameters or null if none.
+ */
+ public List<Pair<String, String>> getExtraParams() {
+ return null;
+ }
+
+ /**
+ * Gets the column definition names and data types. Types are DataType
objects
+ *
+ * @return A list of column definition name value pairs. Cannot be null.
+ */
+ public abstract List<Pair<String, DataType>> getColumnDefinitions();
+
+ protected InputData getInputDataForWritableTable() {
+ return getInputDataForWritableTable(null);
+ }
+
+ protected InputData getInputDataForWritableTable(Path input) {
+
+ if (getWriteAccessorClass() == null) {
+ throw new IllegalArgumentException(
+ "getWriteAccessorClass() must be
overwritten to return a non-null object");
+ }
+
+ if (getWriteResolverClass() == null) {
+ throw new IllegalArgumentException(
+ "getWriteResolverClass() must be
overwritten to return a non-null object");
+ }
+
+ Map<String, String> paramsMap = new HashMap<String, String>();
+
+ paramsMap.put("X-GP-ALIGNMENT", "what");
+ paramsMap.put("X-GP-SEGMENT-ID", "1");
+ paramsMap.put("X-GP-HAS-FILTER", "0");
+ paramsMap.put("X-GP-SEGMENT-COUNT", "1");
+
+ paramsMap.put("X-GP-FORMAT", "GPDBWritable");
+ paramsMap.put("X-GP-URL-HOST", "localhost");
+ paramsMap.put("X-GP-URL-PORT", "50070");
+
+ if (input == null) {
+ paramsMap.put("X-GP-DATA-DIR", "/dummydata");
+ }
+
+ List<Pair<String, DataType>> params = getColumnDefinitions();
+ paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
+ for (int i = 0; i < params.size(); ++i) {
+ paramsMap.put("X-GP-ATTR-NAME" + i,
params.get(i).first);
+ paramsMap.put("X-GP-ATTR-TYPENAME" + i,
params.get(i).second.name());
+ paramsMap.put("X-GP-ATTR-TYPECODE" + i,
Integer.toString(params.get(i).second.getOID()));
+ }
+
+ paramsMap.put("X-GP-ACCESSOR",
getWriteAccessorClass().getName());
+ paramsMap.put("X-GP-RESOLVER",
getWriteResolverClass().getName());
+
+ if (getExtraParams() != null) {
+ for (Pair<String, String> param : getExtraParams()) {
+ paramsMap.put("X-GP-" + param.first,
param.second);
+ }
+ }
+
+ return new ProtocolData(paramsMap);
+ }
+
+ /**
+ * Set all necessary parameters for GPXF framework to function. Uses
the given path as a single input split.
+ *
+ * @param input
+ * The input path, relative or absolute.
+ * @throws Exception
+ */
+ protected void setup(Path input) throws Exception {
+
+ if (getFragmenterClass() == null) {
+ throw new
IllegalArgumentException("getFragmenterClass() must be overwritten to return a
non-null object");
+ }
+
+ if (getReadAccessorClass() == null) {
+ throw new
IllegalArgumentException("getReadAccessorClass() must be overwritten to return
a non-null object");
+ }
+
+ if (getReadResolverClass() == null) {
+ throw new
IllegalArgumentException("getReadResolverClass() must be overwritten to return
a non-null object");
+ }
+
+ Map<String, String> paramsMap = new HashMap<String, String>();
+
+ // 2.1.0 Properties
+ // HDMetaData parameters
+ paramsMap.put("X-GP-ALIGNMENT", "what");
+ paramsMap.put("X-GP-SEGMENT-ID", "1");
+ paramsMap.put("X-GP-HAS-FILTER", "0");
+ paramsMap.put("X-GP-SEGMENT-COUNT", "1");
+ paramsMap.put("X-GP-FRAGMENTER",
getFragmenterClass().getName());
+ paramsMap.put("X-GP-FORMAT", "GPDBWritable");
+ paramsMap.put("X-GP-URL-HOST", "localhost");
+ paramsMap.put("X-GP-URL-PORT", "50070");
+
+ paramsMap.put("X-GP-DATA-DIR", input.toString());
+
+ List<Pair<String, DataType>> params = getColumnDefinitions();
+ paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
+ for (int i = 0; i < params.size(); ++i) {
+ paramsMap.put("X-GP-ATTR-NAME" + i,
params.get(i).first);
+ paramsMap.put("X-GP-ATTR-TYPENAME" + i,
params.get(i).second.name());
+ paramsMap.put("X-GP-ATTR-TYPECODE" + i,
Integer.toString(params.get(i).second.getOID()));
+ }
+
+ // HDFSMetaData properties
+ paramsMap.put("X-GP-ACCESSOR",
getReadAccessorClass().getName());
+ paramsMap.put("X-GP-RESOLVER",
getReadResolverClass().getName());
+
+ if (getExtraParams() != null) {
+ for (Pair<String, String> param : getExtraParams()) {
+ paramsMap.put("X-GP-" + param.first,
param.second);
+ }
+ }
+
+ LocalInputData fragmentInputData = new
LocalInputData(paramsMap);
+
+ List<Fragment> fragments =
getFragmenter(fragmentInputData).getFragments();
+
+ FragmentsResponse fragmentsResponse =
FragmentsResponseFormatter.formatResponse(fragments, input.toString());
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ fragmentsResponse.write(baos);
+
+ String jsonOutput = baos.toString();
+
+ inputs = new ArrayList<InputData>();
+
+ JsonNode node = decodeLineToJsonNode(jsonOutput);
+
+ JsonNode fragmentsArray = node.get("PXFFragments");
+ int i = 0;
+ Iterator<JsonNode> iter = fragmentsArray.getElements();
+ while (iter.hasNext()) {
+ JsonNode fragNode = iter.next();
+ String sourceData =
fragNode.get("sourceName").getTextValue();
+ if (!sourceData.startsWith("/")) {
+ sourceData = "/" + sourceData;
+ }
+ paramsMap.put("X-GP-DATA-DIR", sourceData);
+ paramsMap.put("X-GP-FRAGMENT-METADATA",
fragNode.get("metadata").getTextValue());
+ paramsMap.put("X-GP-DATA-FRAGMENT",
Integer.toString(i++));
+ inputs.add(new LocalInputData(paramsMap));
+ }
+ }
+
+ private JsonNode decodeLineToJsonNode(String line) {
--- End diff --
code duplication? can use the same function in json accessor (perhaps move
to JsonUtils class?)
> Add JSON plugin support in code base
> ------------------------------------
>
> Key: HAWQ-178
> URL: https://issues.apache.org/jira/browse/HAWQ-178
> Project: Apache HAWQ
> Issue Type: New Feature
> Components: PXF
> Reporter: Goden Yao
> Assignee: Goden Yao
> Fix For: backlog
>
> Attachments: PXFJSONPluginforHAWQ2.0andPXF3.0.0.pdf
>
>
> JSON has been a popular format used in HDFS as well as in the community,
> there has been a few JSON PXF plugins developed by the community and we'd
> like to see it being incorporated into the code base as an optional package.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)