LakshSingla commented on a change in pull request #12259:
URL: https://github.com/apache/druid/pull/12259#discussion_r811445052



##########
File path: 
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -93,51 +111,126 @@ public InputRow next()
       @Override
       public void close() throws IOException
       {
-        intermediateRowIterator.close();
+        intermediateRowIteratorWithMetadata.close();
       }
     };
   }
 
   @Override
   public CloseableIterator<InputRowListPlusRawValues> sample() throws 
IOException
   {
-    return intermediateRowIterator().map(row -> {
 
-      final List<Map<String, Object>> rawColumnsList;
-      try {
-        rawColumnsList = toMap(row);
-      }
-      catch (Exception e) {
-        return InputRowListPlusRawValues.of(null,
-                                            new 
ParseException(String.valueOf(row), e, "Unable to parse row [%s] into JSON", 
row));
-      }
+    final CloseableIteratorWithMetadata<T> delegate = 
intermediateRowIteratorWithMetadata();
+    final BiFunction<T, Map<String, Object>, InputRowListPlusRawValues> 
samplingFunction =
+        (row, metadata) -> {
 
-      if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
-        return InputRowListPlusRawValues.of(null,
-                                            new 
ParseException(String.valueOf(row), "No map object parsed for row [%s]", row));
-      }
+          final List<Map<String, Object>> rawColumnsList;
+          try {
+            rawColumnsList = toMap(row);
+          }
+          catch (Exception e) {
+            return InputRowListPlusRawValues.of(
+                null,
+                new ParseException(String.valueOf(row), e, 
buildParseExceptionMessage(
+                    StringUtils.nonStrictFormat("Unable to parse row [%s] into 
JSON", row),
+                    source(),
+                    null,
+                    metadata
+                ))
+            );
+          }
 
-      List<InputRow> rows;
-      try {
-        rows = parseInputRows(row);
-      }
-      catch (ParseException e) {
-        return InputRowListPlusRawValues.ofList(rawColumnsList, e);
+          if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
+            return InputRowListPlusRawValues.of(
+                null,
+                new ParseException(String.valueOf(row), 
buildParseExceptionMessage(
+                    StringUtils.nonStrictFormat("No map object parsed for row 
[%s]", row),
+                    source(),
+                    null,
+                    metadata
+                ))
+            );
+          }
+
+          List<InputRow> rows;
+          try {
+            rows = parseInputRows(row);
+          }
+          catch (ParseException e) {
+            return InputRowListPlusRawValues.ofList(rawColumnsList, new 
ParseException(
+                String.valueOf(row),
+                e,
+                buildParseExceptionMessage(e.getMessage(), source(), null, 
metadata)
+            ));
+          }
+          catch (IOException e) {
+            ParseException exception = new ParseException(String.valueOf(row), 
e, buildParseExceptionMessage(
+                StringUtils.nonStrictFormat("Unable to parse row [%s] into 
inputRow", row),
+                source(),
+                null,
+                metadata
+            ));
+            return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+          }
+
+          return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
+        };
+
+    return new CloseableIterator<InputRowListPlusRawValues>()
+    {
+      @Override
+      public void close() throws IOException
+      {
+        delegate.close();
       }
-      catch (IOException e) {
-        ParseException exception = new ParseException(String.valueOf(row), e, 
"Unable to parse row [%s] into inputRow", row);
-        return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+
+      @Override
+      public boolean hasNext()
+      {
+        return delegate.hasNext();
       }
 
-      return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
-    });
+      @Override
+      public InputRowListPlusRawValues next()
+      {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+
+        return samplingFunction.apply(delegate.next(), delegate.metadata());
+      }
+    };
   }
 
   /**
    * Creates an iterator of intermediate rows. The returned rows will be 
consumed by {@link #parseInputRows} and
-   * {@link #toMap}.
+   * {@link #toMap}. Either this or {@link 
#intermediateRowIteratorWithMetadata()} should be implemented
+   */
+  protected CloseableIterator<T> intermediateRowIterator() throws IOException
+  {
+    throw new UnsupportedEncodingException("intermediateRowIterator not 
implemented");

Review comment:
       Added accidentally




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to