[
https://issues.apache.org/jira/browse/HAWQ-178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15127427#comment-15127427
]
ASF GitHub Bot commented on HAWQ-178:
-------------------------------------
Github user hornn commented on a diff in the pull request:
https://github.com/apache/incubator-hawq/pull/302#discussion_r51511168
--- Diff:
pxf/pxf-json/src/main/java/org/apache/hawq/pxf/plugins/json/JsonRecordReader.java
---
@@ -0,0 +1,166 @@
+package org.apache.hawq.pxf.plugins.json;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.security.InvalidParameterException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hawq.pxf.api.Fragment;
+
+/**
+ * {@link RecordReader} implementation that can read multiline JSON
objects.
+ */
+public class JsonRecordReader implements RecordReader<LongWritable, Text> {
+
+ private static final Log LOG =
LogFactory.getLog(JsonRecordReader.class);
+
+ public static final String RECORD_IDENTIFIER =
"json.input.format.record.identifier";
+
+ private JsonStreamReader streamReader = null;
+ private long start = 0, end = 0;
+ private float toRead = 0;
+ private String identifier = null;
+
+ /**
+ * @param conf
+ * Hadoop context
+ * @param split
+ * HDFS Split as defined by the {@link Fragment}.
+ * @throws IOException
+ */
+ public JsonRecordReader(JobConf conf, FileSplit split) throws
IOException {
+ LOG.debug("JsonRecordReader constructor called. Conf is " +
conf + ". Split is " + split);
+ this.identifier = conf.get(RECORD_IDENTIFIER);
+ LOG.debug("Identifier is " + this.identifier);
+
+ if (isEmpty(this.identifier)) {
+ throw new InvalidParameterException(RECORD_IDENTIFIER +
" is not set.");
+ } else {
+ LOG.debug("Initializing JsonRecordReader with
identifier " + identifier);
+ }
+
+ // get relevant data
+ Path file = split.getPath();
+
+ LOG.debug("File is " + file);
+
+ start = split.getStart();
+ end = start + split.getLength();
+ toRead = end - start;
+ LOG.debug("FileSystem is " + FileSystem.get(conf));
+
+ FSDataInputStream strm = FileSystem.get(conf).open(file);
+
+ LOG.debug("Retrieved file stream ");
+
+ if (start != 0) {
+ strm.seek(start);
+ }
+
+ streamReader = new JsonStreamReader(identifier, new
BufferedInputStream(strm));
+
+ LOG.debug("Reader is " + streamReader);
+ }
+
+ /*
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean next(LongWritable key, Text value) throws IOException {
+
+ boolean retval = false;
+ boolean keepGoing = false;
+ do {
+ // Exit condition (end of block/file)
+ if (streamReader.getBytesRead() >= (end - start)) {
+ return false;
+ }
+
+ keepGoing = false;
+ String record = streamReader.getJsonRecord();
+ if (record != null) {
+ if (JsonUtil.decodeLineToJsonNode(record) ==
null) {
--- End diff --
We call decodeLineToJsonNode twice for each record - one time here
(accessing), and one time in the resolver. I don't know if it is a heavy
operation or not, but it is slow because this function is synchronized, so
multiple requests will be queued.
Consider removing this logic here - and failing in the resolver. We also
have a notion of BadRecordException that can be used when there is a
malformatted record.
> Add JSON plugin support in code base
> ------------------------------------
>
> Key: HAWQ-178
> URL: https://issues.apache.org/jira/browse/HAWQ-178
> Project: Apache HAWQ
> Issue Type: New Feature
> Components: PXF
> Reporter: Goden Yao
> Assignee: Goden Yao
> Fix For: backlog
>
> Attachments: PXFJSONPluginforHAWQ2.0andPXF3.0.0.pdf,
> PXFJSONPluginforHAWQ2.0andPXF3.0.0v.2.pdf
>
>
> JSON has been a popular format used in HDFS as well as in the community,
> there has been a few JSON PXF plugins developed by the community and we'd
> like to see it being incorporated into the code base as an optional package.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)