jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r499784043
##########
File path:
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -123,7 +138,7 @@ public void close() throws IOException
/**
* Parses the given intermediate row into a list of {@link InputRow}s.
- * This should return a non-empty list.
+ * This should return a non-empty list
Review comment:
Accidental change?
##########
File path:
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
public CloseableIterator<InputRowListPlusRawValues> sample() throws
IOException
{
return intermediateRowIterator().map(row -> {
- final Map<String, Object> rawColumns;
+
+ final List<Map<String, Object>> rawColumnsList;
try {
- rawColumns = toMap(row);
+ rawColumnsList = toMap(row);
}
catch (Exception e) {
- return InputRowListPlusRawValues.of(null, new ParseException(e,
"Unable to parse row [%s] into JSON", row));
+ return Collections.singletonList(InputRowListPlusRawValues.of(null,
+ new
ParseException(e, "Unable to parse row [%s] into JSON", row)));
}
+
+ List<InputRow> rows;
try {
- return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+ rows = parseInputRows(row);
}
catch (ParseException e) {
- return InputRowListPlusRawValues.of(rawColumns, e);
+ return
Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty()
? null : rawColumnsList.get(0), e));
}
catch (IOException e) {
- return InputRowListPlusRawValues.of(rawColumns, new ParseException(e,
"Unable to parse row [%s] into inputRow", row));
+ return
Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty()
? null : rawColumnsList.get(0),
+ new
ParseException(e, "Unable to parse row [%s] into inputRow", row)));
+ }
+
+ List<InputRowListPlusRawValues> list = new
ArrayList<InputRowListPlusRawValues>();
+ for (int i = 0; i < Math.min(rows.size(), rawColumnsList.size()); i++) {
Review comment:
I think we should check if the sizes of `rawColumnsList` and `rows` are
same (as in the Javadoc contract) instead of computing a min of them.
##########
File path:
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
public CloseableIterator<InputRowListPlusRawValues> sample() throws
IOException
{
return intermediateRowIterator().map(row -> {
- final Map<String, Object> rawColumns;
+
+ final List<Map<String, Object>> rawColumnsList;
try {
- rawColumns = toMap(row);
+ rawColumnsList = toMap(row);
}
catch (Exception e) {
- return InputRowListPlusRawValues.of(null, new ParseException(e,
"Unable to parse row [%s] into JSON", row));
+ return Collections.singletonList(InputRowListPlusRawValues.of(null,
+ new
ParseException(e, "Unable to parse row [%s] into JSON", row)));
}
+
+ List<InputRow> rows;
try {
- return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+ rows = parseInputRows(row);
}
catch (ParseException e) {
- return InputRowListPlusRawValues.of(rawColumns, e);
+ return
Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty()
? null : rawColumnsList.get(0), e));
Review comment:
Should we return the entire `rawColumnsList` instead of the first
element? I thought `InputRowListPlusRawValues` will be able to have a list of
`rawColumns`.
##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -49,41 +73,57 @@
boolean keepNullColumns
)
{
- super(inputRowSchema, source);
+ this.inputRowSchema = inputRowSchema;
+ this.source = source;
this.flattener = ObjectFlatteners.create(flattenSpec, new
JSONFlattenerMaker(keepNullColumns));
this.mapper = mapper;
}
@Override
- public List<InputRow> parseInputRows(String line) throws IOException,
ParseException
+ protected CloseableIterator<String> intermediateRowIterator() throws
IOException
{
- final JsonNode document = mapper.readValue(line, JsonNode.class);
- final Map<String, Object> flattened = flattener.flatten(document);
- return
Collections.singletonList(MapInputRowParser.parse(getInputRowSchema(),
flattened));
+ return CloseableIterators.withEmptyBaggage(
+ Iterators.singletonIterator(IOUtils.toString(source.open(),
StringUtils.UTF8_STRING))
+ );
}
@Override
- public Map<String, Object> toMap(String intermediateRow) throws IOException
+ protected List<InputRow> parseInputRows(String intermediateRow) throws
IOException, ParseException
{
- //noinspection unchecked
- return mapper.readValue(intermediateRow, Map.class);
- }
+ try (JsonParser parser = new JsonFactory().createParser(intermediateRow)) {
+ final Iterator<JsonNode> delegate = mapper.readValues(parser,
JsonNode.class);
+ return FluentIterable.from(() -> delegate)
+ .transform(jsonNode ->
MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
+ .toList();
+ }
+ catch (RuntimeException e) {
+ //convert Jackson's JsonParseException into druid's exception for
further processing
+ if (e.getCause() instanceof JsonParseException) {
Review comment:
Where is `JsonParseException` thrown wrapped in `RuntimeException`? Can
you add a comment about it?
##########
File path:
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -96,23 +99,35 @@ public void close() throws IOException
public CloseableIterator<InputRowListPlusRawValues> sample() throws
IOException
{
return intermediateRowIterator().map(row -> {
- final Map<String, Object> rawColumns;
+
+ final List<Map<String, Object>> rawColumnsList;
try {
- rawColumns = toMap(row);
+ rawColumnsList = toMap(row);
}
catch (Exception e) {
- return InputRowListPlusRawValues.of(null, new ParseException(e,
"Unable to parse row [%s] into JSON", row));
+ return Collections.singletonList(InputRowListPlusRawValues.of(null,
+ new
ParseException(e, "Unable to parse row [%s] into JSON", row)));
}
+
+ List<InputRow> rows;
try {
- return InputRowListPlusRawValues.of(parseInputRows(row), rawColumns);
+ rows = parseInputRows(row);
}
catch (ParseException e) {
- return InputRowListPlusRawValues.of(rawColumns, e);
+ return
Collections.singletonList(InputRowListPlusRawValues.of(rawColumnsList.isEmpty()
? null : rawColumnsList.get(0), e));
Review comment:
Also, should we fail if `rawColumnsList.isEmpty()`?
##########
File path:
core/src/test/java/org/apache/druid/data/input/impl/JsonInputFormatTest.java
##########
@@ -69,7 +69,7 @@ public void testEquals()
new ObjectMapper(),
new ObjectMapper()
)
- .withIgnoredFields("objectMapper")
+ .withIgnoredFields("objectMapper", "lineSplittable")
Review comment:
`lineSplittable` shouldn't be ignored, but included in `equals()` and
`hashCode()`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]