abhishekagarwal87 commented on a change in pull request #12259:
URL: https://github.com/apache/druid/pull/12259#discussion_r810817086



##########
File path: core/src/main/java/org/apache/druid/data/input/TextReader.java
##########
@@ -65,8 +68,17 @@ public InputRowSchema getInputRowSchema()
       processHeaderLine(delegate.nextLine());
     }
 
-    return new CloseableIterator<String>()
+    return new CloseableIteratorWithMetadata<String>()
     {
+      private final long currentLineNumber = numHeaderLines + 
(needsToProcessHeaderLine() ? 1 : 0);
+      final Map<String, Object> metadata = new 
HashMap<>(ImmutableMap.of("lineNumber", currentLineNumber));

Review comment:
       "lineNumber" can be just "line" and should be declared as a constant. 

##########
File path: 
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -155,6 +248,32 @@ public void close() throws IOException
    */
   protected abstract List<Map<String, Object>> toMap(T intermediateRow) throws 
IOException;
 
+  private String buildParseExceptionMessage(
+      String baseExceptionMessage,
+      @Nullable InputEntity source,
+      @Nullable Long recordNumber,
+      @Nullable Map<String, Object> metadata
+  )
+  {
+    StringBuilder sb = new StringBuilder(baseExceptionMessage);
+    List<String> temp = new ArrayList<>();
+    if (source != null && source.getUri() != null) {
+      temp.add(StringUtils.format("Source info:[%s]", source.getUri()));
+    }
+    if (recordNumber != null) {
+      temp.add(StringUtils.format("Record number:[%d]", recordNumber));
+    }
+    if (metadata != null && !metadata.isEmpty()) {
+      temp.add(StringUtils.format("Additional info:%s", metadata));

Review comment:
       this is how I think it should be formatted
   "Source info: [Path: source.getUri(), Record number : recordNumber, key1: 
value1, key2 : value2]"
   
   where key1 and key2 are metadata entries. 

##########
File path: 
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns 
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root 
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example, 
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be 
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link 
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+  Map<String, Object> metadata();
+
+  static <T> CloseableIteratorWithMetadata<T> 
fromCloseableIterator(CloseableIterator<T> delegate)

Review comment:
       Please add some description here. 

##########
File path: core/src/main/java/org/apache/druid/data/input/TextReader.java
##########
@@ -65,8 +68,17 @@ public InputRowSchema getInputRowSchema()
       processHeaderLine(delegate.nextLine());
     }
 
-    return new CloseableIterator<String>()
+    return new CloseableIteratorWithMetadata<String>()
     {
+      private final long currentLineNumber = numHeaderLines + 
(needsToProcessHeaderLine() ? 1 : 0);
+      final Map<String, Object> metadata = new 
HashMap<>(ImmutableMap.of("lineNumber", currentLineNumber));

Review comment:
       instead of keeping a map around, you can just keep the line number and 
return it inside a map whenever `metadata()` is called. 

##########
File path: 
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns 
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root 
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example, 
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be 
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link 
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+  Map<String, Object> metadata();

Review comment:
       This method should be called currentMetadata()

##########
File path: 
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns 
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root 
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example, 
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be 
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link 
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+  Map<String, Object> metadata();
+
+  static <T> CloseableIteratorWithMetadata<T> 
fromCloseableIterator(CloseableIterator<T> delegate)

Review comment:
       this is similar to CloseableIterator.withEmptyBaggage.

##########
File path: 
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns 
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root 
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example, 
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be 
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link 
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+  Map<String, Object> metadata();
+
+  static <T> CloseableIteratorWithMetadata<T> 
fromCloseableIterator(CloseableIterator<T> delegate)

Review comment:
       ```suggestion
     static <T> CloseableIteratorWithMetadata<T> 
withEmptyMetadata(CloseableIterator<T> delegate)
   ```

##########
File path: 
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns 
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root 
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example, 
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be 
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link 
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+  Map<String, Object> metadata();

Review comment:
       Please add javadocs for method too and what should implementor put in 
this result map? 

##########
File path: 
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -93,51 +111,126 @@ public InputRow next()
       @Override
       public void close() throws IOException
       {
-        intermediateRowIterator.close();
+        intermediateRowIteratorWithMetadata.close();
       }
     };
   }
 
   @Override
   public CloseableIterator<InputRowListPlusRawValues> sample() throws 
IOException
   {
-    return intermediateRowIterator().map(row -> {
 
-      final List<Map<String, Object>> rawColumnsList;
-      try {
-        rawColumnsList = toMap(row);
-      }
-      catch (Exception e) {
-        return InputRowListPlusRawValues.of(null,
-                                            new 
ParseException(String.valueOf(row), e, "Unable to parse row [%s] into JSON", 
row));
-      }
+    final CloseableIteratorWithMetadata<T> delegate = 
intermediateRowIteratorWithMetadata();
+    final BiFunction<T, Map<String, Object>, InputRowListPlusRawValues> 
samplingFunction =
+        (row, metadata) -> {
 
-      if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
-        return InputRowListPlusRawValues.of(null,
-                                            new 
ParseException(String.valueOf(row), "No map object parsed for row [%s]", row));
-      }
+          final List<Map<String, Object>> rawColumnsList;
+          try {
+            rawColumnsList = toMap(row);
+          }
+          catch (Exception e) {
+            return InputRowListPlusRawValues.of(
+                null,
+                new ParseException(String.valueOf(row), e, 
buildParseExceptionMessage(
+                    StringUtils.nonStrictFormat("Unable to parse row [%s] into 
JSON", row),
+                    source(),
+                    null,
+                    metadata
+                ))
+            );
+          }
 
-      List<InputRow> rows;
-      try {
-        rows = parseInputRows(row);
-      }
-      catch (ParseException e) {
-        return InputRowListPlusRawValues.ofList(rawColumnsList, e);
+          if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
+            return InputRowListPlusRawValues.of(
+                null,
+                new ParseException(String.valueOf(row), 
buildParseExceptionMessage(
+                    StringUtils.nonStrictFormat("No map object parsed for row 
[%s]", row),
+                    source(),
+                    null,
+                    metadata
+                ))
+            );
+          }
+
+          List<InputRow> rows;
+          try {
+            rows = parseInputRows(row);
+          }
+          catch (ParseException e) {
+            return InputRowListPlusRawValues.ofList(rawColumnsList, new 
ParseException(
+                String.valueOf(row),
+                e,
+                buildParseExceptionMessage(e.getMessage(), source(), null, 
metadata)
+            ));
+          }
+          catch (IOException e) {
+            ParseException exception = new ParseException(String.valueOf(row), 
e, buildParseExceptionMessage(
+                StringUtils.nonStrictFormat("Unable to parse row [%s] into 
inputRow", row),
+                source(),
+                null,
+                metadata
+            ));
+            return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+          }
+
+          return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
+        };
+
+    return new CloseableIterator<InputRowListPlusRawValues>()
+    {
+      @Override
+      public void close() throws IOException
+      {
+        delegate.close();
       }
-      catch (IOException e) {
-        ParseException exception = new ParseException(String.valueOf(row), e, 
"Unable to parse row [%s] into inputRow", row);
-        return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+
+      @Override
+      public boolean hasNext()
+      {
+        return delegate.hasNext();
       }
 
-      return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
-    });
+      @Override
+      public InputRowListPlusRawValues next()
+      {
+        if (!hasNext()) {
+          throw new NoSuchElementException();
+        }
+
+        return samplingFunction.apply(delegate.next(), delegate.metadata());
+      }
+    };
   }
 
   /**
    * Creates an iterator of intermediate rows. The returned rows will be 
consumed by {@link #parseInputRows} and
-   * {@link #toMap}.
+   * {@link #toMap}. Either this or {@link 
#intermediateRowIteratorWithMetadata()} should be implemented
+   */
+  protected CloseableIterator<T> intermediateRowIterator() throws IOException
+  {
+    throw new UnsupportedEncodingException("intermediateRowIterator not 
implemented");

Review comment:
       UnsupportedEncodingException doesn't look right. It should be 
UnsupportedOperationException here. 

##########
File path: 
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns 
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()

Review comment:
       ```suggestion
    * about the source of last value returned by next()
   ```

##########
File path: 
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -93,51 +111,126 @@ public InputRow next()
       @Override
       public void close() throws IOException
       {
-        intermediateRowIterator.close();
+        intermediateRowIteratorWithMetadata.close();
       }
     };
   }
 
   @Override
   public CloseableIterator<InputRowListPlusRawValues> sample() throws 
IOException
   {
-    return intermediateRowIterator().map(row -> {
 
-      final List<Map<String, Object>> rawColumnsList;
-      try {
-        rawColumnsList = toMap(row);
-      }
-      catch (Exception e) {
-        return InputRowListPlusRawValues.of(null,
-                                            new 
ParseException(String.valueOf(row), e, "Unable to parse row [%s] into JSON", 
row));
-      }
+    final CloseableIteratorWithMetadata<T> delegate = 
intermediateRowIteratorWithMetadata();
+    final BiFunction<T, Map<String, Object>, InputRowListPlusRawValues> 
samplingFunction =
+        (row, metadata) -> {

Review comment:
       Instead of this big lambda, can you create a static method and pass the 
name of the method here in lambda expression here. 

##########
File path: 
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns 
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root 
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example, 
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be 
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link 
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+  Map<String, Object> metadata();
+
+  static <T> CloseableIteratorWithMetadata<T> 
fromCloseableIterator(CloseableIterator<T> delegate)
+  {
+    return new CloseableIteratorWithMetadata<T>()
+    {
+
+      @Override
+      public Map<String, Object> metadata()
+      {
+        return Collections.emptyMap();
+      }
+
+      @Override
+      public void close() throws IOException
+      {
+        delegate.close();
+      }
+
+      @Override
+      public boolean hasNext()
+      {
+        return delegate.hasNext();
+      }
+
+      @Override
+      public T next()
+      {
+        if (!hasNext()) {

Review comment:
       you can directly call delegate.next() here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to