abhishekagarwal87 commented on a change in pull request #12259:
URL: https://github.com/apache/druid/pull/12259#discussion_r810817086
##########
File path: core/src/main/java/org/apache/druid/data/input/TextReader.java
##########
@@ -65,8 +68,17 @@ public InputRowSchema getInputRowSchema()
processHeaderLine(delegate.nextLine());
}
- return new CloseableIterator<String>()
+ return new CloseableIteratorWithMetadata<String>()
{
+ private final long currentLineNumber = numHeaderLines +
(needsToProcessHeaderLine() ? 1 : 0);
+ final Map<String, Object> metadata = new
HashMap<>(ImmutableMap.of("lineNumber", currentLineNumber));
Review comment:
"lineNumber" can be just "line" and should be declared as a constant.
##########
File path:
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -155,6 +248,32 @@ public void close() throws IOException
*/
protected abstract List<Map<String, Object>> toMap(T intermediateRow) throws
IOException;
+ private String buildParseExceptionMessage(
+ String baseExceptionMessage,
+ @Nullable InputEntity source,
+ @Nullable Long recordNumber,
+ @Nullable Map<String, Object> metadata
+ )
+ {
+ StringBuilder sb = new StringBuilder(baseExceptionMessage);
+ List<String> temp = new ArrayList<>();
+ if (source != null && source.getUri() != null) {
+ temp.add(StringUtils.format("Source info:[%s]", source.getUri()));
+ }
+ if (recordNumber != null) {
+ temp.add(StringUtils.format("Record number:[%d]", recordNumber));
+ }
+ if (metadata != null && !metadata.isEmpty()) {
+ temp.add(StringUtils.format("Additional info:%s", metadata));
Review comment:
this is how I think it should be formatted
"Source info: [Path: source.getUri(), Record number : recordNumber, key1:
value1, key2 : value2]"
where key1 and key2 are metadata entries.
##########
File path:
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example,
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+ Map<String, Object> metadata();
+
+ static <T> CloseableIteratorWithMetadata<T>
fromCloseableIterator(CloseableIterator<T> delegate)
Review comment:
Please add some description here.
##########
File path: core/src/main/java/org/apache/druid/data/input/TextReader.java
##########
@@ -65,8 +68,17 @@ public InputRowSchema getInputRowSchema()
processHeaderLine(delegate.nextLine());
}
- return new CloseableIterator<String>()
+ return new CloseableIteratorWithMetadata<String>()
{
+ private final long currentLineNumber = numHeaderLines +
(needsToProcessHeaderLine() ? 1 : 0);
+ final Map<String, Object> metadata = new
HashMap<>(ImmutableMap.of("lineNumber", currentLineNumber));
Review comment:
instead of keeping a map around, you can just keep the line number and
return it inside a map whenever `metadata()` is called.
##########
File path:
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example,
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+ Map<String, Object> metadata();
Review comment:
This method should be called currentMetadata()
##########
File path:
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example,
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+ Map<String, Object> metadata();
+
+ static <T> CloseableIteratorWithMetadata<T>
fromCloseableIterator(CloseableIterator<T> delegate)
Review comment:
this is similar to CloseableIterator.withEmptyBaggage.
##########
File path:
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example,
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+ Map<String, Object> metadata();
+
+ static <T> CloseableIteratorWithMetadata<T>
fromCloseableIterator(CloseableIterator<T> delegate)
Review comment:
```suggestion
static <T> CloseableIteratorWithMetadata<T>
withEmptyMetadata(CloseableIterator<T> delegate)
```
##########
File path:
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example,
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+ Map<String, Object> metadata();
Review comment:
Please add javadocs for method too and what should implementor put in
this result map?
##########
File path:
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -93,51 +111,126 @@ public InputRow next()
@Override
public void close() throws IOException
{
- intermediateRowIterator.close();
+ intermediateRowIteratorWithMetadata.close();
}
};
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws
IOException
{
- return intermediateRowIterator().map(row -> {
- final List<Map<String, Object>> rawColumnsList;
- try {
- rawColumnsList = toMap(row);
- }
- catch (Exception e) {
- return InputRowListPlusRawValues.of(null,
- new
ParseException(String.valueOf(row), e, "Unable to parse row [%s] into JSON",
row));
- }
+ final CloseableIteratorWithMetadata<T> delegate =
intermediateRowIteratorWithMetadata();
+ final BiFunction<T, Map<String, Object>, InputRowListPlusRawValues>
samplingFunction =
+ (row, metadata) -> {
- if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
- return InputRowListPlusRawValues.of(null,
- new
ParseException(String.valueOf(row), "No map object parsed for row [%s]", row));
- }
+ final List<Map<String, Object>> rawColumnsList;
+ try {
+ rawColumnsList = toMap(row);
+ }
+ catch (Exception e) {
+ return InputRowListPlusRawValues.of(
+ null,
+ new ParseException(String.valueOf(row), e,
buildParseExceptionMessage(
+ StringUtils.nonStrictFormat("Unable to parse row [%s] into
JSON", row),
+ source(),
+ null,
+ metadata
+ ))
+ );
+ }
- List<InputRow> rows;
- try {
- rows = parseInputRows(row);
- }
- catch (ParseException e) {
- return InputRowListPlusRawValues.ofList(rawColumnsList, e);
+ if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
+ return InputRowListPlusRawValues.of(
+ null,
+ new ParseException(String.valueOf(row),
buildParseExceptionMessage(
+ StringUtils.nonStrictFormat("No map object parsed for row
[%s]", row),
+ source(),
+ null,
+ metadata
+ ))
+ );
+ }
+
+ List<InputRow> rows;
+ try {
+ rows = parseInputRows(row);
+ }
+ catch (ParseException e) {
+ return InputRowListPlusRawValues.ofList(rawColumnsList, new
ParseException(
+ String.valueOf(row),
+ e,
+ buildParseExceptionMessage(e.getMessage(), source(), null,
metadata)
+ ));
+ }
+ catch (IOException e) {
+ ParseException exception = new ParseException(String.valueOf(row),
e, buildParseExceptionMessage(
+ StringUtils.nonStrictFormat("Unable to parse row [%s] into
inputRow", row),
+ source(),
+ null,
+ metadata
+ ));
+ return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+ }
+
+ return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
+ };
+
+ return new CloseableIterator<InputRowListPlusRawValues>()
+ {
+ @Override
+ public void close() throws IOException
+ {
+ delegate.close();
}
- catch (IOException e) {
- ParseException exception = new ParseException(String.valueOf(row), e,
"Unable to parse row [%s] into inputRow", row);
- return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+
+ @Override
+ public boolean hasNext()
+ {
+ return delegate.hasNext();
}
- return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
- });
+ @Override
+ public InputRowListPlusRawValues next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ return samplingFunction.apply(delegate.next(), delegate.metadata());
+ }
+ };
}
/**
* Creates an iterator of intermediate rows. The returned rows will be
consumed by {@link #parseInputRows} and
- * {@link #toMap}.
+ * {@link #toMap}. Either this or {@link
#intermediateRowIteratorWithMetadata()} should be implemented
+ */
+ protected CloseableIterator<T> intermediateRowIterator() throws IOException
+ {
+ throw new UnsupportedEncodingException("intermediateRowIterator not
implemented");
Review comment:
UnsupportedEncodingException doesn't look right. It should be
UnsupportedOperationException here.
##########
File path:
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
Review comment:
```suggestion
* about the source of last value returned by next()
```
##########
File path:
core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
##########
@@ -93,51 +111,126 @@ public InputRow next()
@Override
public void close() throws IOException
{
- intermediateRowIterator.close();
+ intermediateRowIteratorWithMetadata.close();
}
};
}
@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws
IOException
{
- return intermediateRowIterator().map(row -> {
- final List<Map<String, Object>> rawColumnsList;
- try {
- rawColumnsList = toMap(row);
- }
- catch (Exception e) {
- return InputRowListPlusRawValues.of(null,
- new
ParseException(String.valueOf(row), e, "Unable to parse row [%s] into JSON",
row));
- }
+ final CloseableIteratorWithMetadata<T> delegate =
intermediateRowIteratorWithMetadata();
+ final BiFunction<T, Map<String, Object>, InputRowListPlusRawValues>
samplingFunction =
+ (row, metadata) -> {
Review comment:
Instead of this big lambda, can you create a static method and pass the
name of the method here in lambda expression here.
##########
File path:
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * Like {@link CloseableIterator}, but has a metadata() method, which returns
"metadata", which is effectively a Map<String, Object>
+ * about the last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example,
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+ Map<String, Object> metadata();
+
+ static <T> CloseableIteratorWithMetadata<T>
fromCloseableIterator(CloseableIterator<T> delegate)
+ {
+ return new CloseableIteratorWithMetadata<T>()
+ {
+
+ @Override
+ public Map<String, Object> metadata()
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ delegate.close();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public T next()
+ {
+ if (!hasNext()) {
Review comment:
you can directly call delegate.next() here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]