jihoonson commented on a change in pull request #10383:
URL: https://github.com/apache/druid/pull/10383#discussion_r510469802



##########
File path: 
core/src/main/java/org/apache/druid/data/input/InputRowListPlusRawValues.java
##########
@@ -82,8 +121,16 @@ private InputRowListPlusRawValues(
     return inputRows;
   }
 
+  /**
+   * This method is left here only for test cases
+   */
   @Nullable
   public Map<String, Object> getRawValues()
+  {
+    return CollectionUtils.isNullOrEmpty(rawValues) ? null : rawValues.get(0);

Review comment:
       nit: it would be better to use `Iterables.getOnlyElement(rawValues)` to 
make sure that you are getting the only element.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
##########
@@ -131,17 +130,27 @@ public SamplerResponse sample(
             continue;
           }
 
-          for (InputRow row : inputRowListPlusRawValues.getInputRows()) {
-            index.add(new SamplerInputRow(row, counter), true);
+          for (int i = 0; i < rawColumnsList.size(); i++) {
+            Map<String, Object> rawColumns = rawColumnsList.get(i);
+            InputRow row = inputRowListPlusRawValues.getInputRows().get(i);
+
+            //keep the index of the row to be added to responseRows for 
further use
+            final int rowIndex = responseRows.size();
+            index.add(new SamplerInputRow(row, rowIndex), true);
+
             // store the raw value; will be merged with the data from the 
IncrementalIndex later
-            responseRows[counter] = new SamplerResponseRow(rawColumns, null, 
null, null);
-            counter++;
+            responseRows.add(new SamplerResponseRow(rawColumns, null, null, 
null));
             numRowsIndexed++;
           }
         }
         catch (ParseException e) {
-          responseRows[counter] = new SamplerResponseRow(rawColumns, null, 
true, e.getMessage());
-          counter++;
+          if (rawColumnsList != null) {
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new 
SamplerResponseRow(rawColumns, null, true, e.getMessage()))
+                                              .collect(Collectors.toList()));

Review comment:
       Can this introduce duplicate rows in `responseRows`? Suppose you have 3 
rawColumns in `rawColumnsList` including an unparseable row at the second 
position. The first row will be added to both `index` and thus a new 
`SamplerResponseRow` of the first `rawColumns` will be added to `responseRows`. 
But for the second row, `index.add()` will throw a `ParseException` which will 
execute these lines. In this case, 2 duplicate `rawColumns` of the first row 
will be added in `responseRows`.
   
   Looking at [what `JsonReader` 
does](https://github.com/apache/druid/pull/10383/files#diff-c745a28591c789098f0a9efd75769ad242b6c40379d155fc61143ed63dccc995R93-R108),
 it seems throwing away the whole `intermediateRow` when there is any 
unparseable row. The sampler behavior should match to the actual ingestion.

##########
File path: 
core/src/main/java/org/apache/druid/data/input/InputRowListPlusRawValues.java
##########
@@ -53,21 +54,59 @@ public static InputRowListPlusRawValues of(@Nullable 
InputRow inputRow, Map<Stri
 
   public static InputRowListPlusRawValues of(@Nullable List<InputRow> 
inputRows, Map<String, Object> rawColumns)
   {
-    return new InputRowListPlusRawValues(inputRows, 
Preconditions.checkNotNull(rawColumns, "rawColumns"), null);
+    return new InputRowListPlusRawValues(inputRows,
+                                         
Collections.singletonList(Preconditions.checkNotNull(rawColumns, "rawColumns")),
+                                         null);
   }
 
   public static InputRowListPlusRawValues of(@Nullable Map<String, Object> 
rawColumns, ParseException parseException)
   {
     return new InputRowListPlusRawValues(
         null,
-        rawColumns,
+        rawColumns == null ? null : Collections.singletonList(rawColumns),
         Preconditions.checkNotNull(parseException, "parseException")
     );
   }
 
+  public static InputRowListPlusRawValues ofList(@Nullable List<Map<String, 
Object>> rawColumnsList, ParseException parseException)
+  {
+    return ofList(rawColumnsList, null, parseException);
+  }
+
+  /**
+   * Create an instance of {@link InputRowListPlusRawValues}
+   *
+   * Make sure the size of given rawColumnsList and inputRows are the same if 
both of them are not null
+   */
+  public static InputRowListPlusRawValues ofList(@Nullable List<Map<String, 
Object>> rawColumnsList,
+                                                 @Nullable List<InputRow> 
inputRows)
+  {
+    return ofList(rawColumnsList, inputRows, null);
+  }
+
+  /**
+   * Create an instance of {@link InputRowListPlusRawValues}
+   *
+   * Make sure the size of given rawColumnsList and inputRows are the same if 
both of them are not null
+   */
+  public static InputRowListPlusRawValues ofList(@Nullable List<Map<String, 
Object>> rawColumnsList,
+                                                 @Nullable List<InputRow> 
inputRows,
+                                                 ParseException parseException)

Review comment:
       `ParseException` is Nullable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to