jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r491620988



##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -41,6 +42,20 @@
   private final ObjectMapper objectMapper;
   private final boolean keepNullColumns;
 
+  /**
+   * <pre>
+   * This parameter is introduced to support json string of an object in 
multiple lines in streaming ingestion records
+   *
+   * It indicates whether the input text can be splitted into lines in first, 
which will then be parsed into JSON objects one by one independently.

Review comment:
       typo: splitted -> split.

##########
File path: 
core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>

Review comment:
       As this class is extracted now, please add some Javaodc describing what 
this class does and where it is used.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>
+ * In constract to {@link JsonLineReader} which processes input text line by 
line independently,
+ * this class tries to parse the input text as a whole to an array of objects.
+ *
+ * The input text can be:
+ * 1. a JSON string of an object in a line or multiple lines(such as 
pretty-printed JSON text)
+ * 2. multiple JSON object strings concated by white space character(s)
+ *
+ * For case 2, what should be noticed is that if an exception is thrown when 
parsing one JSON string,
+ * the rest JSON text will all be ignored
+ *
+ * For more information, see: https://github.com/apache/druid/pull/10383
+ * </pre>
+ */
+public class JsonReader implements InputEntityReader

Review comment:
       Thinking about this new reader, I think the new requirement for parsing 
multiple JSON objects to multiple rows doesn't fit in the current interface of 
the sampler. The sampler currently assumes that _there is only one JSON object 
in an input chunk_ which could have either an array or a nested object. That 
means, the current interface allows multiple out rows as `inputRows` is a list 
of `InputRow`s in `InputRowListPlusRawValues`, but doesn't allow multiple 
values in an input chunk as `rawValues` is a map in 
`InputRowListPlusRawValues`. To support the requirement, I think `rawValues` 
should be a list of map, so that all raw values in an input chunk can be 
returned to the sampler.
   
   If we do this, `JsonReader` can simply extend 
`IntermediateRowParsingReader`, where the intermediate row will be the 
[`inputText` created from 
`source.open()`](https://github.com/apache/druid/pull/10383/files#diff-cc591396000acab135e9bd4aa1f31e2aR72).
 Here is an example.
   
   ```java
   public class JsonReader2 extends IntermediateRowParsingReader<String>
   {
     private final ObjectFlattener<JsonNode> flattener;
     private final ObjectMapper mapper;
     private final InputEntity source;
     private final InputRowSchema inputRowSchema;
   
     JsonReader2(
         InputRowSchema inputRowSchema,
         InputEntity source,
         JSONPathSpec flattenSpec,
         ObjectMapper mapper,
         boolean keepNullColumns
     )
     {
       this.inputRowSchema = inputRowSchema;
       this.source = source;
       this.flattener = ObjectFlatteners.create(flattenSpec, new 
JSONFlattenerMaker(keepNullColumns));
       this.mapper = mapper;
     }
   
     @Override
     protected CloseableIterator<String> intermediateRowIterator() throws 
IOException
     {
       return CloseableIterators.withEmptyBaggage(
           Iterators.singletonIterator(IOUtils.toString(source.open(), 
StringUtils.UTF8_STRING))
       );
     }
   
     @Override
     protected List<InputRow> parseInputRows(String intermediateRow) throws 
IOException, ParseException
     {
       try (JsonParser parser = new 
JsonFactory().createParser(intermediateRow)) {
         final Iterator<JsonNode> delegate = mapper.readValues(parser, 
JsonNode.class);
         return FluentIterable.from(() -> delegate)
                              .transform(jsonNode -> 
MapInputRowParser.parse(inputRowSchema, flattener.flatten(jsonNode)))
                              .toList();
       }
     }
   
     @Override
     protected List<Map<String, Object>> toMap(String intermediateRow) throws 
IOException
     {
       try (JsonParser parser = new 
JsonFactory().createParser(intermediateRow)) {
         final Iterator<Map> delegate = mapper.readValues(parser, Map.class);
         return FluentIterable.from(() -> delegate).transform(map -> 
(Map<String, Object>) map).toList();
       }
     }
   }
   ```
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   
   

##########
File path: 
core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>

Review comment:
       If `JsonReader` extends `IntermediateRowParsingReader`, this class will 
not have to be extracted.

##########
File path: 
core/src/test/java/org/apache/druid/data/input/impl/JsonReaderTest.java
##########
@@ -126,138 +149,158 @@ public void testParseRowWithConditional() throws 
IOException
     try (CloseableIterator<InputRow> iterator = reader.read()) {
       int numActualIterations = 0;
       while (iterator.hasNext()) {
+
         final InputRow row = iterator.next();
-        Assert.assertEquals("test", 
Iterables.getOnlyElement(row.getDimension("bar")));
-        Assert.assertEquals(Collections.emptyList(), row.getDimension("foo"));
-        Assert.assertTrue(row.getDimension("baz").isEmpty());
-        numActualIterations++;
-      }
-      Assert.assertEquals(numExpectedIterations, numActualIterations);
-    }
-  }
 
-  @Test
-  public void testParseRowKeepNullColumns() throws IOException
-  {
-    final JsonInputFormat format = new JsonInputFormat(
-        new JSONPathSpec(
-            true,
-            ImmutableList.of(
-                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", 
"$.o.mg")
-            )
-        ),
-        null,
-        true
-    );
+        Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
+        Assert.assertEquals("x", 
Iterables.getOnlyElement(row.getDimension("foo")));
+        Assert.assertEquals("4", 
Iterables.getOnlyElement(row.getDimension("baz")));
+        Assert.assertEquals("4", 
Iterables.getOnlyElement(row.getDimension("root_baz")));
+        Assert.assertEquals("1", 
Iterables.getOnlyElement(row.getDimension("path_omg")));
+        Assert.assertEquals("1", 
Iterables.getOnlyElement(row.getDimension("jq_omg")));
 
-    final ByteEntity source = new ByteEntity(
-        
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"o\":{\"mg\":null}}")
-    );
+        Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
+        Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
+        Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
 
-    final InputEntityReader reader = format.createReader(
-        new InputRowSchema(
-            new TimestampSpec("timestamp", "iso", null),
-            new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
-            Collections.emptyList()
-        ),
-        source,
-        null
-    );
-    final int numExpectedIterations = 1;
-    try (CloseableIterator<InputRow> iterator = reader.read()) {
-      int numActualIterations = 0;
-      while (iterator.hasNext()) {
-        final InputRow row = iterator.next();
-        Assert.assertEquals(Arrays.asList("path_omg", "timestamp", "bar", 
"foo"), row.getDimensions());
-        Assert.assertTrue(row.getDimension("bar").isEmpty());
-        Assert.assertEquals("x", 
Iterables.getOnlyElement(row.getDimension("foo")));
-        Assert.assertTrue(row.getDimension("path_omg").isEmpty());
         numActualIterations++;
       }
+
       Assert.assertEquals(numExpectedIterations, numActualIterations);
     }
   }
 
   @Test
-  public void testKeepNullColumnsWithNoNullValues() throws IOException
+  public void testInvalidJSONText() throws IOException
   {
     final JsonInputFormat format = new JsonInputFormat(
         new JSONPathSpec(
             true,
             ImmutableList.of(
-                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", 
"$.o.mg")
+                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz", 
"baz"),
+                new JSONPathFieldSpec(JSONPathFieldType.ROOT, "root_baz2", 
"baz2"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg", 
"$.o.mg"),
+                new JSONPathFieldSpec(JSONPathFieldType.PATH, "path_omg2", 
"$.o.mg2"),
+                new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg", ".o.mg"),
+                new JSONPathFieldSpec(JSONPathFieldType.JQ, "jq_omg2", 
".o.mg2")
             )
         ),
         null,
-        true
+        null
     );
 
+    //make sure JsonReader is used
+    format.setLineSplittable(false);
+
     final ByteEntity source = new ByteEntity(
-        
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":1,\"foo\":\"x\",\"o\":{\"mg\":\"a\"}}")
+        
StringUtils.toUtf8("{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":1}}"
+                           + 
"{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4xxx,\"o\":{\"mg\":2}}"
 //baz property is illegal
+                           + 
"{\"timestamp\":\"2019-01-01\",\"bar\":null,\"foo\":\"x\",\"baz\":4,\"o\":{\"mg\":3}}")
     );
 
     final InputEntityReader reader = format.createReader(
         new InputRowSchema(
             new TimestampSpec("timestamp", "iso", null),
-            new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(Collections.emptyList())),
+            new 
DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("bar", 
"foo"))),
             Collections.emptyList()
         ),
         source,
         null
     );
+
+    // the 2nd line is ill-formed, it stops to iterate to the 3rd line.
+    // So in total, only 1 lines has been parsed
     final int numExpectedIterations = 1;
+
     try (CloseableIterator<InputRow> iterator = reader.read()) {
       int numActualIterations = 0;
       while (iterator.hasNext()) {
-        final InputRow row = iterator.next();
-        Assert.assertEquals(Arrays.asList("path_omg", "timestamp", "bar", 
"foo"), row.getDimensions());
-        Assert.assertEquals("1", 
Iterables.getOnlyElement(row.getDimension("bar")));
-        Assert.assertEquals("x", 
Iterables.getOnlyElement(row.getDimension("foo")));
-        Assert.assertEquals("a", 
Iterables.getOnlyElement(row.getDimension("path_omg")));
-        numActualIterations++;
+
+        try {
+          final InputRow row = iterator.next();
+
+          final String msgId = String.valueOf(++numActualIterations);
+          Assert.assertEquals(DateTimes.of("2019-01-01"), row.getTimestamp());
+          Assert.assertEquals("x", 
Iterables.getOnlyElement(row.getDimension("foo")));
+          Assert.assertEquals("4", 
Iterables.getOnlyElement(row.getDimension("baz")));
+          Assert.assertEquals("4", 
Iterables.getOnlyElement(row.getDimension("root_baz")));
+          Assert.assertEquals(msgId, 
Iterables.getOnlyElement(row.getDimension("path_omg")));
+          Assert.assertEquals(msgId, 
Iterables.getOnlyElement(row.getDimension("jq_omg")));
+
+          Assert.assertTrue(row.getDimension("root_baz2").isEmpty());
+          Assert.assertTrue(row.getDimension("path_omg2").isEmpty());
+          Assert.assertTrue(row.getDimension("jq_omg2").isEmpty());
+        }
+        catch (Exception e) {
+          //ignore the exception when parsing the 2nd

Review comment:
       Please verify that `e` is the exception what we expect.

##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -41,6 +42,20 @@
   private final ObjectMapper objectMapper;
   private final boolean keepNullColumns;
 
+  /**
+   * <pre>

Review comment:
       The `pre` tag is a tag for pre-formatted text such as source codes. I 
think you don't have to use it in this case.

##########
File path: core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
##########
@@ -33,13 +40,98 @@
 
 import java.io.IOException;
 import java.util.Collections;
-import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.NoSuchElementException;
 
-public class JsonReader extends TextReader
+/**
+ * <pre>

Review comment:
       Same comment here. You don't need a `pre` tag.

##########
File path: 
core/src/main/java/org/apache/druid/data/input/ExceptionThrowingIterator.java
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+public class ExceptionThrowingIterator<T> implements CloseableIterator<T>
+{
+  private final RuntimeException exception;
+
+  private boolean thrown = false;
+
+  public ExceptionThrowingIterator(Throwable exception)
+  {
+    this.exception = exception instanceof RuntimeException
+                     ? (RuntimeException) exception
+                     : new RuntimeException(exception);
+  }
+
+  @Override
+  public boolean hasNext()
+  {
+    return !thrown;
+  }
+
+  @Override
+  public T next()
+  {
+    thrown = true;

Review comment:
       Hmm, SpotBugs thinks this is not good since it cannot throw 
`NoSuchElementException` which is wrong since `next()` never can be called more 
than once. But, let's make it happy by simply adding `if (!haxNext()) throws 
`NoSuchElementException` here.

##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -73,7 +88,14 @@ public boolean isSplittable()
   @Override
   public InputEntityReader createReader(InputRowSchema inputRowSchema, 
InputEntity source, File temporaryDirectory)
   {
-    return new JsonReader(inputRowSchema, source, getFlattenSpec(), 
objectMapper, keepNullColumns);
+    return this.lineSplittable ?
+           new JsonLineReader(inputRowSchema, source, getFlattenSpec(), 
objectMapper, keepNullColumns) :
+           new JsonReader(inputRowSchema, source, getFlattenSpec(), 
objectMapper, keepNullColumns);
+  }
+
+  public void setLineSplittable(boolean lineSplittable)

Review comment:
       This doesn't look pretty, but simple enough to handle this exceptional 
case. Maybe we can introduce another layer on top of inputFormat to make it 
more pretty, but I don't think it will be worth at least at this point.

##########
File path: 
core/src/main/java/org/apache/druid/data/input/impl/JsonInputFormat.java
##########
@@ -41,6 +42,20 @@
   private final ObjectMapper objectMapper;
   private final boolean keepNullColumns;
 
+  /**
+   * <pre>

Review comment:
       Hmm, I think the Javadoc should be more clear on what this flag means. 
How about rephrase it as below?
   
   ```
   This parameter indicates whether or not the given InputEntity should be 
split by lines before parsing it. If it is set to true, the InputEntity must be 
split by lines first. If it is set to false, unlike what you could imagine, it 
means that the InputEntity doesn't have to be split by lines first, but it can 
still contain multiple lines. A created InputEntityReader from this format will 
determine by itself if line splitting is necessary.
   
   This parameter should always be true for batch ingestion and false for 
streaming ingestion. For more information, see: 
https://github.com/apache/druid/pull/10383.
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to