[ 
https://issues.apache.org/jira/browse/HAWQ-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15127427#comment-15127427
 ] 

ASF GitHub Bot commented on HAWQ-178:
-------------------------------------

Github user hornn commented on a diff in the pull request:

    https://github.com/apache/incubator-hawq/pull/302#discussion_r51511168
  
    --- Diff: 
pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
 ---
    @@ -0,0 +1,166 @@
    +package org.apache.hawq.pxf.plugins.json;
    +
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + * 
    + *   http://www.apache.org/licenses/LICENSE-2.0
    + * 
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +
    +import static org.apache.commons.lang3.StringUtils.isEmpty;
    +
    +import java.io.BufferedInputStream;
    +import java.io.IOException;
    +import java.security.InvalidParameterException;
    +
    +import org.apache.commons.logging.Log;
    +import org.apache.commons.logging.LogFactory;
    +import org.apache.hadoop.fs.FSDataInputStream;
    +import org.apache.hadoop.fs.FileSystem;
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.LongWritable;
    +import org.apache.hadoop.io.Text;
    +import org.apache.hadoop.mapred.FileSplit;
    +import org.apache.hadoop.mapred.JobConf;
    +import org.apache.hadoop.mapred.RecordReader;
    +import org.apache.hawq.pxf.api.Fragment;
    +
    +/**
    + * {@link RecordReader} implementation that can read multiline JSON 
objects.
    + */
    +public class JsonRecordReader implements RecordReader<LongWritable, Text> {
    +
    +   private static final Log LOG = 
LogFactory.getLog(JsonRecordReader.class);
    +
    +   public static final String RECORD_IDENTIFIER = 
"json.input.format.record.identifier";
    +
    +   private JsonStreamReader streamReader = null;
    +   private long start = 0, end = 0;
    +   private float toRead = 0;
    +   private String identifier = null;
    +
    +   /**
    +    * @param conf
    +    *            Hadoop context
    +    * @param split
    +    *            HDFS Split as defined by the {@link Fragment}.
    +    * @throws IOException
    +    */
    +   public JsonRecordReader(JobConf conf, FileSplit split) throws 
IOException {
    +           LOG.debug("JsonRecordReader constructor called.  Conf is " + 
conf + ". Split is " + split);
    +           this.identifier = conf.get(RECORD_IDENTIFIER);
    +           LOG.debug("Identifier is " + this.identifier);
    +
    +           if (isEmpty(this.identifier)) {
    +                   throw new InvalidParameterException(RECORD_IDENTIFIER + 
" is not set.");
    +           } else {
    +                   LOG.debug("Initializing JsonRecordReader with 
identifier " + identifier);
    +           }
    +
    +           // get relevant data
    +           Path file = split.getPath();
    +
    +           LOG.debug("File is " + file);
    +
    +           start = split.getStart();
    +           end = start + split.getLength();
    +           toRead = end - start;
    +           LOG.debug("FileSystem is " + FileSystem.get(conf));
    +
    +           FSDataInputStream strm = FileSystem.get(conf).open(file);
    +
    +           LOG.debug("Retrieved file stream ");
    +
    +           if (start != 0) {
    +                   strm.seek(start);
    +           }
    +
    +           streamReader = new JsonStreamReader(identifier, new 
BufferedInputStream(strm));
    +
    +           LOG.debug("Reader is " + streamReader);
    +   }
    +
    +   /*
    +    * {@inheritDoc}
    +    */
    +   @Override
    +   public boolean next(LongWritable key, Text value) throws IOException {
    +
    +           boolean retval = false;
    +           boolean keepGoing = false;
    +           do {
    +                   // Exit condition (end of block/file)
    +                   if (streamReader.getBytesRead() >= (end - start)) {
    +                           return false;
    +                   }
    +
    +                   keepGoing = false;
    +                   String record = streamReader.getJsonRecord();
    +                   if (record != null) {
    +                           if (JsonUtil.decodeLineToJsonNode(record) == 
null) {
    --- End diff --
    
    We call decodeLineToJsonNode twice for each record - one time here 
(accessing), and one time in the resolver. I don't know if it is a heavy 
operation or not, but it is slow because this function is synchronized, so 
multiple requests will be queued.
    Consider removing this logic here - and failing in the resolver. We also 
have a notion of BadRecordException that can be used when there is a 
malformatted record. 


> Add JSON plugin support in code base
> ------------------------------------
>
>                 Key: HAWQ-178
>                 URL: https://issues.apache.org/jira/browse/HAWQ-178
>             Project: Apache HAWQ
>          Issue Type: New Feature
>          Components: PXF
>            Reporter: Goden Yao
>            Assignee: Goden Yao
>             Fix For: backlog
>
>         Attachments: PXFJSONPluginforHAWQ2.0andPXF3.0.0.pdf, 
> PXFJSONPluginforHAWQ2.0andPXF3.0.0v.2.pdf
>
>
> JSON has been a popular format used in HDFS as well as in the community, 
> there has been a few JSON PXF plugins developed by the community and we'd 
> like to see it being incorporated into the code base as an optional package.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to