jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520018811



##########
File path: 
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -2726,4 +2726,83 @@ public void close()
         null
     );
   }
+
+  @Test(timeout = 60_000L)
+  public void testMultipleLinesJSONText() throws Exception
+  {
+    reportParseExceptions = false;
+    maxParseExceptions = 1000;
+    maxSavedParseExceptions = 2;
+
+    // Insert data
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+
+      //multiple objects in one Kafka record will yield 2 rows in druid
+      String wellformed = toJsonString(true, "2049", "d2", "y", "10", "22.0", 
"2.0") +
+                     toJsonString(true, "2049", "d3", "y", "10", "23.0", 
"3.0");
+
+      //multiple objects in one Kafka record but some objects are in 
ill-formed format
+      //the whole ProducerRecord will be discarded
+      String illformed = "{\"timestamp\":2049, \"dim1\": \"d4\", 
\"dim2\":\"x\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }" +
+                     "{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\", 
\"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }" +
+                     "{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\", 
\"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
+
+      ProducerRecord[] producerRecords = new ProducerRecord[]{
+          // pretty formatted
+          new ProducerRecord<>(topic, 0, null, jb(true, "2049", "d1", "y", 
"10", "20.0", "1.0")),
+          //well-formed
+          new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)),
+          //ill-formed
+          new ProducerRecord<>(topic, 0, null, 
StringUtils.toUtf8("illformed")),

Review comment:
       Did you intend `StringUtils.toUtf8(illformed)`? The variable `illformed` 
is not in use.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -111,38 +113,49 @@ public SamplerResponse sample(
     try (final CloseableIterator<InputRowListPlusRawValues> iterator = 
reader.sample();
          final IncrementalIndex<Aggregator> index = 
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
          final Closer closer1 = closer) {
-      SamplerResponseRow[] responseRows = new 
SamplerResponseRow[nonNullSamplerConfig.getNumRows()];
-      int counter = 0, numRowsIndexed = 0;
-
-      while (counter < responseRows.length && iterator.hasNext()) {
-        Map<String, Object> rawColumns = null;
-        try {
-          final InputRowListPlusRawValues inputRowListPlusRawValues = 
iterator.next();
-
-          if (inputRowListPlusRawValues.getRawValues() != null) {
-            rawColumns = inputRowListPlusRawValues.getRawValues();
-          }
-
-          if (inputRowListPlusRawValues.getParseException() != null) {
-            throw inputRowListPlusRawValues.getParseException();
+      List<SamplerResponseRow> responseRows = new 
ArrayList<>(nonNullSamplerConfig.getNumRows());
+      int numRowsIndexed = 0;
+
+      while (responseRows.size() < nonNullSamplerConfig.getNumRows() && 
iterator.hasNext()) {
+        final InputRowListPlusRawValues inputRowListPlusRawValues = 
iterator.next();
+
+        final List<Map<String, Object>> rawColumnsList = 
inputRowListPlusRawValues.getRawValuesList();
+
+        final ParseException parseException = 
inputRowListPlusRawValues.getParseException();
+        if (parseException != null) {
+          if (rawColumnsList != null) {
+            // add all rows to response
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new 
SamplerResponseRow(rawColumns, null, true, parseException.getMessage()))
+                                              .collect(Collectors.toList()));

Review comment:
       nit: with this change, when a parseException is thrown while parsing a 
list of rows in one message, those rows in the message will be added as 
separate rows in `responseRows` but with the same parseException. This can be 
confusing since the parseException error message seem irrelevant to the 
associated `rawColumns` in the same `responseRow`. IMO, the better fix in this 
case would be storing the whole `rawColumnsList` in one `SamplerResponseRow`. 
Then, the parseException can indicate that it was thrown while parsing one of 
the rows in `rawColumnsList`. However, this requires a change on the web 
console side as well. I'm OK with fixing this in a follow-up PR.

##########
File path: 
indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1060,6 +1062,107 @@ public void testWithFilter() throws IOException
     );
   }
 
+  @Test
+  public void testIndexParseException() throws IOException

Review comment:
       Thanks for adding this test! Similar to 
https://github.com/apache/druid/pull/10383/files#diff-ef25ac1cc1f275b47b939b65e1d0c8b8b8512aeada52d06b8541b8f381df03eeR2731,
 could you please add a unit test for sampling a block of multiple JSON 
strings? The unit test can be run only when `parserType` is `STR_JSON`. [We 
usually just return in the unit test when the parameter is not what we want to 
test 
with](https://github.com/apache/druid/blob/master/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java#L6812-L6814).
 It would be nice if the test verifies the followings:
   - The sampler response when there is no parseException with a list of 
multiple JSON strings.
   - The sampler response when there is a parseException thrown while parsing a 
list of multiple JSON strings. Maybe you can improve this unit test to do it as 
well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to