This is an automated email from the ASF dual-hosted git repository. gparai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 797524dd2df4a18ef17752aff501a9e099f50a93 Author: ozinoviev <[email protected]> AuthorDate: Thu Aug 29 15:14:17 2019 +0300 DRILL-7362: COUNT(*) on JSON with outer list results in JsonParse error closes #1849 --- .../store/easy/json/reader/BaseJsonReader.java | 167 +++++++++++++++++++++ .../store/easy/json/reader/CountingJsonReader.java | 47 ++---- .../drill/exec/vector/complex/fn/JsonReader.java | 126 ++-------------- .../exec/store/json/TestJsonRecordReader.java | 13 ++ 4 files changed, 203 insertions(+), 150 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonReader.java new file mode 100644 index 0000000..983aa9f --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/BaseJsonReader.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.easy.json.reader; + +import com.fasterxml.jackson.core.JsonToken; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Basic reader implementation for json documents. + */ +public abstract class BaseJsonReader extends BaseJsonProcessor { + + private static final Logger logger = LoggerFactory.getLogger(BaseJsonReader.class); + + /** + * Describes whether or not this reader can unwrap a single root array record + * and treat it like a set of distinct records. + */ + private final boolean skipOuterList; + + /** + * Whether the reader is currently in a situation where we are unwrapping an + * outer list. + */ + private boolean inOuterList; + + public BaseJsonReader(DrillBuf workBuf, boolean enableNanInf, boolean enableEscapeAnyChar, boolean skipOuterList) { + super(workBuf, enableNanInf, enableEscapeAnyChar); + this.skipOuterList = skipOuterList; + } + + @Override + public ReadState write(ComplexWriter writer) throws IOException { + + try { + JsonToken t = lastSeenJsonToken; + if (t == null || t == JsonToken.END_OBJECT) { + t = parser.nextToken(); + } + while (!parser.hasCurrentToken() && !parser.isClosed()) { + t = parser.nextToken(); + } + lastSeenJsonToken = null; + + if (parser.isClosed()) { + return ReadState.END_OF_STREAM; + } + + ReadState readState = writeToVector(writer, t); + + switch (readState) { + case END_OF_STREAM: + case WRITE_SUCCEED: + return readState; + default: + throw getExceptionWithContext(UserException.dataReadError(), null).message( + "Failure while reading JSON. (Got an invalid read state %s )", readState.toString()) + .build(logger); + } + } catch (com.fasterxml.jackson.core.JsonParseException ex) { + if (ignoreJSONParseError()) { + if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM) { + return ReadState.JSON_RECORD_PARSE_EOF_ERROR; + } else { + return ReadState.JSON_RECORD_PARSE_ERROR; + } + } else { + throw ex; + } + } + } + + + private ReadState writeToVector(ComplexWriter writer, JsonToken t) + throws IOException { + + switch (t) { + case START_OBJECT: + writeDocument(writer, t); + break; + case START_ARRAY: + if (inOuterList) { + throw createDocumentTopLevelException(); + } + + if (skipOuterList) { + t = parser.nextToken(); + if (t == JsonToken.START_OBJECT) { + inOuterList = true; + writeDocument(writer, t); + } else { + throw createDocumentTopLevelException(); + } + + } else { + writeDocument(writer, t); + } + break; + case END_ARRAY: + + if (inOuterList) { + confirmLast(); + return ReadState.END_OF_STREAM; + } else { + throw getExceptionWithContext(UserException.dataReadError(), null).message( + "Failure while parsing JSON. Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger); + } + + case NOT_AVAILABLE: + return ReadState.END_OF_STREAM; + default: + throw getExceptionWithContext(UserException.dataReadError(), null) + .message( + "Failure while parsing JSON. Found token of [%s]. Drill currently only supports parsing " + + "json strings that contain either lists or maps. The root object cannot be a scalar.", + t).build(logger); + } + + return ReadState.WRITE_SUCCEED; + } + + /** + * Writes the contents of the json node starting with the specified token into a complex vector. + * Token can take the following values: + * - START_ARRAY - the top level of json document is an array and skipping of the outer list is disabled + * - START_OBJECT - the top level of json document is a set of white space delimited maps + * or skipping of the outer list is enabled + */ + protected abstract void writeDocument(ComplexWriter writer, JsonToken t) throws IOException; + + protected UserException createDocumentTopLevelException() { + String message = "The top level of your document must either be a single array of maps or a set " + + "of white space delimited maps."; + return getExceptionWithContext(UserException.dataReadError(), message).build(logger); + } + + private void confirmLast() throws IOException { + parser.nextToken(); + if (!parser.isClosed()) { + String message = "Drill attempted to unwrap a toplevel list in your document. " + + "However, it appears that there is trailing content after this top level list. Drill only " + + "supports querying a set of distinct maps or a single json array with multiple inner maps."; + throw getExceptionWithContext(UserException.dataReadError(), message).build(logger); + } + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java index 73b93f4..c9bcb0d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/reader/CountingJsonReader.java @@ -23,50 +23,29 @@ import com.fasterxml.jackson.core.JsonToken; import io.netty.buffer.DrillBuf; -import org.apache.drill.exec.vector.complex.writer.BaseWriter; +import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; -public class CountingJsonReader extends BaseJsonProcessor { +public class CountingJsonReader extends BaseJsonReader { public CountingJsonReader(DrillBuf workBuf, boolean enableNanInf, boolean enableEscapeAnyChar) { - super(workBuf, enableNanInf, enableEscapeAnyChar); + super(workBuf, enableNanInf, enableEscapeAnyChar, true); } @Override - public ReadState write(BaseWriter.ComplexWriter writer) throws IOException { - try { - JsonToken token = lastSeenJsonToken; - if (token == null || token == JsonToken.END_OBJECT){ - token = parser.nextToken(); - } - lastSeenJsonToken = null; - if (token == JsonToken.FIELD_NAME) { - currentFieldName = parser.getText(); - } - if (!parser.hasCurrentToken()) { - return ReadState.END_OF_STREAM; - } else if (token != JsonToken.START_OBJECT) { - throw new com.fasterxml.jackson.core.JsonParseException( - parser, String.format("Cannot read from the middle of a record. Current token was %s ", token)); - } - writer.rootAsMap().bit("count").writeBit(1); - parser.skipChildren(); - } catch (com.fasterxml.jackson.core.JsonParseException ex) { - if (ignoreJSONParseError()) { - if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM){ - return ReadState.JSON_RECORD_PARSE_EOF_ERROR; - } - else{ - return ReadState.JSON_RECORD_PARSE_ERROR; - } - } else { - throw ex; - } + protected void writeDocument(ComplexWriter writer, JsonToken t) throws IOException { + switch (t) { + case START_OBJECT: + case START_ARRAY: + writer.rootAsMap().bit("count").writeBit(1); + parser.skipChildren(); + break; + default: + throw createDocumentTopLevelException(); } - return ReadState.WRITE_SUCCEED; } @Override - public void ensureAtLeastOneField(BaseWriter.ComplexWriter writer) { + public void ensureAtLeastOneField(ComplexWriter writer) { } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java index 3426243..ec838d0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/fn/JsonReader.java @@ -26,7 +26,7 @@ import java.util.List; import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.physical.base.GroupScan; -import org.apache.drill.exec.store.easy.json.reader.BaseJsonProcessor; +import org.apache.drill.exec.store.easy.json.reader.BaseJsonReader; import org.apache.drill.exec.vector.complex.fn.VectorOutput.ListVectorOutput; import org.apache.drill.exec.vector.complex.fn.VectorOutput.MapVectorOutput; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; @@ -40,7 +40,7 @@ import org.apache.drill.shaded.guava.com.google.common.base.Charsets; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; import org.apache.drill.shaded.guava.com.google.common.collect.Lists; -public class JsonReader extends BaseJsonProcessor { +public class JsonReader extends BaseJsonReader { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory .getLogger(JsonReader.class); public final static int MAX_RECORD_SIZE = 128 * 1024; @@ -59,25 +59,12 @@ public class JsonReader extends BaseJsonProcessor { */ private final List<ListWriter> emptyArrayWriters = Lists.newArrayList(); - /** - * Describes whether or not this reader can unwrap a single root array record - * and treat it like a set of distinct records. - */ - private final boolean skipOuterList; - - /** - * Whether the reader is currently in a situation where we are unwrapping an - * outer list. - */ - private boolean inOuterList; - private FieldSelection selection; private JsonReader(Builder builder) { - super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar); + super(builder.managedBuf, builder.enableNanInf, builder.enableEscapeAnyChar, builder.skipOuterList); selection = FieldSelection.getFieldSelection(builder.columns); workingBuffer = builder.workingBuffer; - skipOuterList = builder.skipOuterList; allTextMode = builder.allTextMode; columns = builder.columns; mapOutput = builder.mapOutput; @@ -184,110 +171,17 @@ public class JsonReader extends BaseJsonProcessor { } @Override - public ReadState write(ComplexWriter writer) throws IOException { - - ReadState readState = null; - try { - JsonToken t = lastSeenJsonToken; - if (t == null || t == JsonToken.END_OBJECT) { - t = parser.nextToken(); - } - while (!parser.hasCurrentToken() && !parser.isClosed()) { - t = parser.nextToken(); - } - lastSeenJsonToken = null; - - if (parser.isClosed()) { - return ReadState.END_OF_STREAM; - } - - readState = writeToVector(writer, t); - - switch (readState) { - case END_OF_STREAM: + protected void writeDocument(ComplexWriter writer, JsonToken t) throws IOException { + switch (t) { + case START_OBJECT: + writeDataSwitch(writer.rootAsMap()); break; - case WRITE_SUCCEED: + case START_ARRAY: + writeDataSwitch(writer.rootAsList()); break; default: - throw getExceptionWithContext(UserException.dataReadError(), null).message( - "Failure while reading JSON. (Got an invalid read state %s )", readState.toString()) - .build(logger); - } - } catch (com.fasterxml.jackson.core.JsonParseException ex) { - if (ignoreJSONParseError()) { - if (processJSONException() == JsonExceptionProcessingState.END_OF_STREAM) { - return ReadState.JSON_RECORD_PARSE_EOF_ERROR; - } else { - return ReadState.JSON_RECORD_PARSE_ERROR; - } - } else { - throw ex; - } - } - return readState; - } - - private void confirmLast() throws IOException { - parser.nextToken(); - if (!parser.isClosed()) { - String message = "Drill attempted to unwrap a toplevel list in your document. " - + "However, it appears that there is trailing content after this top level list. Drill only " - + "supports querying a set of distinct maps or a single json array with multiple inner maps."; - throw getExceptionWithContext(UserException.dataReadError(), message).build(logger); - } - } - - private ReadState writeToVector(ComplexWriter writer, JsonToken t) - throws IOException { - - switch (t) { - case START_OBJECT: - writeDataSwitch(writer.rootAsMap()); - break; - case START_ARRAY: - if (inOuterList) { - String message = "The top level of your document must either be a single array of maps or a set " - + "of white space delimited maps."; - throw getExceptionWithContext(UserException.dataReadError(), message).build(logger); - } - - if (skipOuterList) { - t = parser.nextToken(); - if (t == JsonToken.START_OBJECT) { - inOuterList = true; - writeDataSwitch(writer.rootAsMap()); - } else { - String message = "The top level of your document must either be a single array of maps or a set " - + "of white space delimited maps."; - throw getExceptionWithContext(UserException.dataReadError(), message).build(logger); - } - - } else { - writeDataSwitch(writer.rootAsList()); - } - break; - case END_ARRAY: - - if (inOuterList) { - confirmLast(); - return ReadState.END_OF_STREAM; - } else { - throw getExceptionWithContext(UserException.dataReadError(), null).message( - "Failure while parsing JSON. Ran across unexpected %s.", JsonToken.END_ARRAY).build(logger); - } - - case NOT_AVAILABLE: - return ReadState.END_OF_STREAM; - default: - throw getExceptionWithContext(UserException.dataReadError(), null) - .message( - "Failure while parsing JSON. Found token of [%s]. Drill currently only supports parsing " - + "json strings that contain either lists or maps. The root object cannot be a scalar.", - t).build(logger); + throw createDocumentTopLevelException(); } - - return ReadState.WRITE_SUCCEED; - } private void writeDataSwitch(MapWriter w) throws IOException { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java index 7a601eb..719c3a3 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/json/TestJsonRecordReader.java @@ -261,4 +261,17 @@ public class TestJsonRecordReader extends BaseTestQuery { } throw new Exception("testNotCountingQueryNotSkippingInvalidJSONRecords"); } + + @Test + @Category(UnlikelyTest.class) + // See DRILL-7362 + /* Test for CountingJSONReader */ + public void testContainingArrayCount() throws Exception { + testBuilder() + .sqlQuery("select count(*) as cnt from cp.`store/json/listdoc.json`") + .unOrdered() + .baselineColumns("cnt") + .baselineValues(2L) + .go(); + } }
