[ 
https://issues.apache.org/jira/browse/HAWQ-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15127168#comment-15127168
 ] 

ASF GitHub Bot commented on HAWQ-178:
-------------------------------------

Github user tzolov commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/302#discussion_r51492471
  
    --- Diff: 
pxf/pxf-json/src/test/java/org/apache/pxf/hawq/plugins/json/PxfUnit.java ---
    @@ -0,0 +1,666 @@
    +package org.apache.pxf.hawq.plugins.json;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import java.io.BufferedReader;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.InputStreamReader;
    +import java.io.OutputStream;
    +import java.lang.reflect.Constructor;
    +import java.security.InvalidParameterException;
    +import java.util.ArrayList;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.hadoop.conf.Configuration;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hawq.pxf.api.Fragment;
    +import org.apache.hawq.pxf.api.Fragmenter;
    +import org.apache.hawq.pxf.api.OneField;
    +import org.apache.hawq.pxf.api.OneRow;
    +import org.apache.hawq.pxf.api.ReadAccessor;
    +import org.apache.hawq.pxf.api.ReadResolver;
    +import org.apache.hawq.pxf.api.WriteAccessor;
    +import org.apache.hawq.pxf.api.WriteResolver;
    +import org.apache.hawq.pxf.api.io.DataType;
    +import org.apache.hawq.pxf.api.utilities.InputData;
    +import org.apache.hawq.pxf.service.FragmentsResponse;
    +import org.apache.hawq.pxf.service.FragmentsResponseFormatter;
    +import org.apache.hawq.pxf.service.utilities.ProtocolData;
    +import org.codehaus.jackson.JsonFactory;
    +import org.codehaus.jackson.JsonNode;
    +import org.codehaus.jackson.JsonParseException;
    +import org.codehaus.jackson.map.JsonMappingException;
    +import org.codehaus.jackson.map.ObjectMapper;
    +import org.junit.Assert;
    +
    +/**
    + * This abstract class contains a number of helpful utilities in 
developing a PXF extension for HAWQ. Extend this class
    + * and use the various <code>assert</code> methods to check given input 
against known output.
    + */
    +public abstract class PxfUnit {
    +
    +   private static JsonFactory factory = new JsonFactory();
    +   private static ObjectMapper mapper = new ObjectMapper(factory);
    +
    +   protected static List<InputData> inputs = null;
    +
    +   /**
    +    * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the file for
    +    * output testing.
    +    * 
    +    * @param input
    +    *            Input records
    +    * @param expectedOutput
    +    *            File containing output to check
    +    * @throws Exception
    +    */
    +   public void assertOutput(Path input, Path expectedOutput) throws 
Exception {
    +
    +           BufferedReader rdr = new BufferedReader(new 
InputStreamReader(FileSystem.get(new Configuration()).open(
    +                           expectedOutput)));
    +
    +           List<String> outputLines = new ArrayList<String>();
    +
    +           String line;
    +           while ((line = rdr.readLine()) != null) {
    +                   outputLines.add(line);
    +           }
    +
    +           assertOutput(input, outputLines);
    +   }
    +
    +   /**
    +    * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the given
    +    * parameter for output testing.
    +    * 
    +    * @param input
    +    *            Input records
    +    * @param expectedOutput
    +    *            File containing output to check
    +    * @throws Exception
    +    */
    +   public void assertOutput(Path input, List<String> expectedOutput) 
throws Exception {
    +
    +           setup(input);
    +           List<String> actualOutput = new ArrayList<String>();
    +           for (InputData data : inputs) {
    +                   ReadAccessor accessor = getReadAccessor(data);
    +                   ReadResolver resolver = getReadResolver(data);
    +
    +                   actualOutput.addAll(getAllOutput(accessor, resolver));
    +           }
    +
    +           Assert.assertFalse("Output did not match expected output", 
compareOutput(expectedOutput, actualOutput));
    +   }
    +
    +   /**
    +    * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the given
    +    * parameter for output testing.<br>
    +    * <br>
    +    * Ignores order of records.
    +    * 
    +    * @param input
    +    *            Input records
    +    * @param expectedOutput
    +    *            File containing output to check
    +    * @throws Exception
    +    */
    +   public void assertUnorderedOutput(Path input, Path expectedOutput) 
throws Exception {
    +           BufferedReader rdr = new BufferedReader(new 
InputStreamReader(FileSystem.get(new Configuration()).open(
    +                           expectedOutput)));
    +
    +           List<String> outputLines = new ArrayList<String>();
    +
    +           String line;
    +           while ((line = rdr.readLine()) != null) {
    +                   outputLines.add(line);
    +           }
    +
    +           assertUnorderedOutput(input, outputLines);
    +   }
    +
    +   /**
    +    * Uses the given input directory to run through the PXF unit testing 
framework. Uses the lines in the file for
    +    * output testing.<br>
    +    * <br>
    +    * Ignores order of records.
    +    * 
    +    * @param input
    +    *            Input records
    +    * @param expectedOutput
    +    *            File containing output to check
    +    * @throws Exception
    +    */
    +   public void assertUnorderedOutput(Path input, List<String> 
expectedOutput) throws Exception {
    +
    +           setup(input);
    +
    +           List<String> actualOutput = new ArrayList<String>();
    +           for (InputData data : inputs) {
    +                   ReadAccessor accessor = getReadAccessor(data);
    +                   ReadResolver resolver = getReadResolver(data);
    +
    +                   actualOutput.addAll(getAllOutput(accessor, resolver));
    +           }
    +
    +           Assert.assertFalse("Output did not match expected output", 
compareUnorderedOutput(expectedOutput, actualOutput));
    +   }
    +
    +   /**
    +    * Writes the output to the given output stream. Comma delimiter.
    +    * 
    +    * @param input
    +    *            The input file
    +    * @param output
    +    *            The output stream
    +    * @throws Exception
    +    */
    +   public void writeOutput(Path input, OutputStream output) throws 
Exception {
    +
    +           setup(input);
    +
    +           for (InputData data : inputs) {
    +                   ReadAccessor accessor = getReadAccessor(data);
    +                   ReadResolver resolver = getReadResolver(data);
    +
    +                   for (String line : getAllOutput(accessor, resolver)) {
    +                           output.write((line + "\n").getBytes());
    +                   }
    +           }
    +
    +           output.flush();
    +   }
    +
    +   /**
    +    * Get the class of the implementation of Fragmenter to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends Fragmenter> getFragmenterClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get the class of the implementation of ReadAccessor to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends ReadAccessor> getReadAccessorClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get the class of the implementation of WriteAccessor to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends WriteAccessor> getWriteAccessorClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get the class of the implementation of Resolver to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends ReadResolver> getReadResolverClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get the class of the implementation of WriteResolver to be tested.
    +    * 
    +    * @return The class
    +    */
    +   public Class<? extends WriteResolver> getWriteResolverClass() {
    +           return null;
    +   }
    +
    +   /**
    +    * Get any extra parameters that are meant to be specified for the 
"pxf" protocol. Note that "X-GP-" is prepended to
    +    * each parameter name.
    +    * 
    +    * @return Any extra parameters or null if none.
    +    */
    +   public List<Pair<String, String>> getExtraParams() {
    +           return null;
    +   }
    +
    +   /**
    +    * Gets the column definition names and data types. Types are DataType 
objects
    +    * 
    +    * @return A list of column definition name value pairs. Cannot be null.
    +    */
    +   public abstract List<Pair<String, DataType>> getColumnDefinitions();
    +
    +   protected InputData getInputDataForWritableTable() {
    +           return getInputDataForWritableTable(null);
    +   }
    +
    +   protected InputData getInputDataForWritableTable(Path input) {
    +
    +           if (getWriteAccessorClass() == null) {
    +                   throw new IllegalArgumentException(
    +                                   "getWriteAccessorClass() must be 
overwritten to return a non-null object");
    +           }
    +
    +           if (getWriteResolverClass() == null) {
    +                   throw new IllegalArgumentException(
    +                                   "getWriteResolverClass() must be 
overwritten to return a non-null object");
    +           }
    +
    +           Map<String, String> paramsMap = new HashMap<String, String>();
    +
    +           paramsMap.put("X-GP-ALIGNMENT", "what");
    +           paramsMap.put("X-GP-SEGMENT-ID", "1");
    +           paramsMap.put("X-GP-HAS-FILTER", "0");
    +           paramsMap.put("X-GP-SEGMENT-COUNT", "1");
    +
    +           paramsMap.put("X-GP-FORMAT", "GPDBWritable");
    +           paramsMap.put("X-GP-URL-HOST", "localhost");
    +           paramsMap.put("X-GP-URL-PORT", "50070");
    +
    +           if (input == null) {
    +                   paramsMap.put("X-GP-DATA-DIR", "/dummydata");
    +           }
    +
    +           List<Pair<String, DataType>> params = getColumnDefinitions();
    +           paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
    +           for (int i = 0; i < params.size(); ++i) {
    +                   paramsMap.put("X-GP-ATTR-NAME" + i, 
params.get(i).first);
    +                   paramsMap.put("X-GP-ATTR-TYPENAME" + i, 
params.get(i).second.name());
    +                   paramsMap.put("X-GP-ATTR-TYPECODE" + i, 
Integer.toString(params.get(i).second.getOID()));
    +           }
    +
    +           paramsMap.put("X-GP-ACCESSOR", 
getWriteAccessorClass().getName());
    +           paramsMap.put("X-GP-RESOLVER", 
getWriteResolverClass().getName());
    +
    +           if (getExtraParams() != null) {
    +                   for (Pair<String, String> param : getExtraParams()) {
    +                           paramsMap.put("X-GP-" + param.first, 
param.second);
    +                   }
    +           }
    +
    +           return new ProtocolData(paramsMap);
    +   }
    +
    +   /**
    +    * Set all necessary parameters for GPXF framework to function. Uses 
the given path as a single input split.
    +    * 
    +    * @param input
    +    *            The input path, relative or absolute.
    +    * @throws Exception
    +    */
    +   protected void setup(Path input) throws Exception {
    +
    +           if (getFragmenterClass() == null) {
    +                   throw new 
IllegalArgumentException("getFragmenterClass() must be overwritten to return a 
non-null object");
    +           }
    +
    +           if (getReadAccessorClass() == null) {
    +                   throw new 
IllegalArgumentException("getReadAccessorClass() must be overwritten to return 
a non-null object");
    +           }
    +
    +           if (getReadResolverClass() == null) {
    +                   throw new 
IllegalArgumentException("getReadResolverClass() must be overwritten to return 
a non-null object");
    +           }
    +
    +           Map<String, String> paramsMap = new HashMap<String, String>();
    +
    +           // 2.1.0 Properties
    +           // HDMetaData parameters
    +           paramsMap.put("X-GP-ALIGNMENT", "what");
    +           paramsMap.put("X-GP-SEGMENT-ID", "1");
    +           paramsMap.put("X-GP-HAS-FILTER", "0");
    +           paramsMap.put("X-GP-SEGMENT-COUNT", "1");
    +           paramsMap.put("X-GP-FRAGMENTER", 
getFragmenterClass().getName());
    +           paramsMap.put("X-GP-FORMAT", "GPDBWritable");
    +           paramsMap.put("X-GP-URL-HOST", "localhost");
    +           paramsMap.put("X-GP-URL-PORT", "50070");
    +
    +           paramsMap.put("X-GP-DATA-DIR", input.toString());
    +
    +           List<Pair<String, DataType>> params = getColumnDefinitions();
    +           paramsMap.put("X-GP-ATTRS", Integer.toString(params.size()));
    +           for (int i = 0; i < params.size(); ++i) {
    +                   paramsMap.put("X-GP-ATTR-NAME" + i, 
params.get(i).first);
    +                   paramsMap.put("X-GP-ATTR-TYPENAME" + i, 
params.get(i).second.name());
    +                   paramsMap.put("X-GP-ATTR-TYPECODE" + i, 
Integer.toString(params.get(i).second.getOID()));
    +           }
    +
    +           // HDFSMetaData properties
    +           paramsMap.put("X-GP-ACCESSOR", 
getReadAccessorClass().getName());
    +           paramsMap.put("X-GP-RESOLVER", 
getReadResolverClass().getName());
    +
    +           if (getExtraParams() != null) {
    +                   for (Pair<String, String> param : getExtraParams()) {
    +                           paramsMap.put("X-GP-" + param.first, 
param.second);
    +                   }
    +           }
    +
    +           LocalInputData fragmentInputData = new 
LocalInputData(paramsMap);
    +
    +           List<Fragment> fragments = 
getFragmenter(fragmentInputData).getFragments();
    +
    +           FragmentsResponse fragmentsResponse = 
FragmentsResponseFormatter.formatResponse(fragments, input.toString());
    +
    +           ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +           fragmentsResponse.write(baos);
    +
    +           String jsonOutput = baos.toString();
    +
    +           inputs = new ArrayList<InputData>();
    +
    +           JsonNode node = decodeLineToJsonNode(jsonOutput);
    +
    +           JsonNode fragmentsArray = node.get("PXFFragments");
    +           int i = 0;
    +           Iterator<JsonNode> iter = fragmentsArray.getElements();
    +           while (iter.hasNext()) {
    +                   JsonNode fragNode = iter.next();
    +                   String sourceData = 
fragNode.get("sourceName").getTextValue();
    +                   if (!sourceData.startsWith("/")) {
    +                           sourceData = "/" + sourceData;
    +                   }
    +                   paramsMap.put("X-GP-DATA-DIR", sourceData);
    +                   paramsMap.put("X-GP-FRAGMENT-METADATA", 
fragNode.get("metadata").getTextValue());
    +                   paramsMap.put("X-GP-DATA-FRAGMENT", 
Integer.toString(i++));
    +                   inputs.add(new LocalInputData(paramsMap));
    +           }
    +   }
    +
    +   private JsonNode decodeLineToJsonNode(String line) {
    --- End diff --
    
    The PxfUnit should go in its own standalone project and be used only for 
testing. So better tackle any code duplications in PxfUnit once it is factored 
out from the pxf-json. 


> Add JSON plugin support in code base
> ------------------------------------
>
>                 Key: HAWQ-178
>                 URL: https://issues.apache.org/jira/browse/HAWQ-178
>             Project: Apache HAWQ
>          Issue Type: New Feature
>          Components: PXF
>            Reporter: Goden Yao
>            Assignee: Goden Yao
>             Fix For: backlog
>
>         Attachments: PXFJSONPluginforHAWQ2.0andPXF3.0.0.pdf
>
>
> JSON has been a popular format used in HDFS as well as in the community, 
> there has been a few JSON PXF plugins developed by the community and we'd 
> like to see it being incorporated into the code base as an optional package.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to