jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r520018811
##########
File path:
extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
##########
@@ -2726,4 +2726,83 @@ public void close()
null
);
}
+
+ @Test(timeout = 60_000L)
+ public void testMultipleLinesJSONText() throws Exception
+ {
+ reportParseExceptions = false;
+ maxParseExceptions = 1000;
+ maxSavedParseExceptions = 2;
+
+ // Insert data
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+
+ //multiple objects in one Kafka record will yield 2 rows in druid
+ String wellformed = toJsonString(true, "2049", "d2", "y", "10", "22.0",
"2.0") +
+ toJsonString(true, "2049", "d3", "y", "10", "23.0",
"3.0");
+
+ //multiple objects in one Kafka record but some objects are in
ill-formed format
+ //the whole ProducerRecord will be discarded
+ String illformed = "{\"timestamp\":2049, \"dim1\": \"d4\",
\"dim2\":\"x\", \"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"2.0\" }" +
+ "{\"timestamp\":2049, \"dim1\": \"d5\", \"dim2\":\"y\",
\"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":invalidFormat }" +
+ "{\"timestamp\":2049, \"dim1\": \"d6\", \"dim2\":\"z\",
\"dimLong\": 10, \"dimFloat\":\"24.0\", \"met1\":\"3.0\" }";
+
+ ProducerRecord[] producerRecords = new ProducerRecord[]{
+ // pretty formatted
+ new ProducerRecord<>(topic, 0, null, jb(true, "2049", "d1", "y",
"10", "20.0", "1.0")),
+ //well-formed
+ new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8(wellformed)),
+ //ill-formed
+ new ProducerRecord<>(topic, 0, null,
StringUtils.toUtf8("illformed")),
Review comment:
Did you intend `StringUtils.toUtf8(illformed)`? The variable `illformed`
is not in use.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -111,38 +113,49 @@ public SamplerResponse sample(
try (final CloseableIterator<InputRowListPlusRawValues> iterator =
reader.sample();
final IncrementalIndex<Aggregator> index =
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
final Closer closer1 = closer) {
- SamplerResponseRow[] responseRows = new
SamplerResponseRow[nonNullSamplerConfig.getNumRows()];
- int counter = 0, numRowsIndexed = 0;
-
- while (counter < responseRows.length && iterator.hasNext()) {
- Map<String, Object> rawColumns = null;
- try {
- final InputRowListPlusRawValues inputRowListPlusRawValues =
iterator.next();
-
- if (inputRowListPlusRawValues.getRawValues() != null) {
- rawColumns = inputRowListPlusRawValues.getRawValues();
- }
-
- if (inputRowListPlusRawValues.getParseException() != null) {
- throw inputRowListPlusRawValues.getParseException();
+ List<SamplerResponseRow> responseRows = new
ArrayList<>(nonNullSamplerConfig.getNumRows());
+ int numRowsIndexed = 0;
+
+ while (responseRows.size() < nonNullSamplerConfig.getNumRows() &&
iterator.hasNext()) {
+ final InputRowListPlusRawValues inputRowListPlusRawValues =
iterator.next();
+
+ final List<Map<String, Object>> rawColumnsList =
inputRowListPlusRawValues.getRawValuesList();
+
+ final ParseException parseException =
inputRowListPlusRawValues.getParseException();
+ if (parseException != null) {
+ if (rawColumnsList != null) {
+ // add all rows to response
+ responseRows.addAll(rawColumnsList.stream()
+ .map(rawColumns -> new
SamplerResponseRow(rawColumns, null, true, parseException.getMessage()))
+ .collect(Collectors.toList()));
Review comment:
nit: with this change, when a parseException is thrown while parsing a
list of rows in one message, those rows in the message will be added as
separate rows in `responseRows` but with the same parseException. This can be
confusing since the parseException error message seem irrelevant to the
associated `rawColumns` in the same `responseRow`. IMO, the better fix in this
case would be storing the whole `rawColumnsList` in one `SamplerResponseRow`.
Then, the parseException can indicate that it was thrown while parsing one of
the rows in `rawColumnsList`. However, this requires a change on the web
console side as well. I'm OK with fixing this in a follow-up PR.
##########
File path:
indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
##########
@@ -1060,6 +1062,107 @@ public void testWithFilter() throws IOException
);
}
+ @Test
+ public void testIndexParseException() throws IOException
Review comment:
Thanks for adding this test! Similar to
https://github.com/apache/druid/pull/10383/files#diff-ef25ac1cc1f275b47b939b65e1d0c8b8b8512aeada52d06b8541b8f381df03eeR2731,
could you please add a unit test for sampling a block of multiple JSON
strings? The unit test can be run only when `parserType` is `STR_JSON`. [We
usually just return in the unit test when the parameter is not what we want to
test
with](https://github.com/apache/druid/blob/master/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java#L6812-L6814).
It would be nice if the test verifies the followings:
- The sampler response when there is no parseException with a list of
multiple JSON strings.
- The sampler response when there is a parseException thrown while parsing a
list of multiple JSON strings. Maybe you can improve this unit test to do it as
well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]