This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 3f709db Make ParseExceptions more informative (#12259)
3f709db is described below
commit 3f709db173d779db1466e57a1564a5f557b4b0cf
Author: Laksh Singla <[email protected]>
AuthorDate: Mon Feb 28 22:31:15 2022 +0530
Make ParseExceptions more informative (#12259)
This PR aims to make the ParseExceptions in Druid more informative, by
adding additional information (metadata) to the ParseException, which can
contain additional information about the exception. For example - the path of
the file generating the issue, the line number (where it can be easily fetched
- like CsvReader)
Following changes are addressed in this PR:
A new class CloseableIteratorWithMetadata has been created which is like
CloseableIterator but also has a metadata method that returns a context
Map<String, Object> about the current element returned by next().
IntermediateRowParsingReader#read() now attaches the InputEntity and the
"record number" which created the exception (while parsing them), and
IntermediateRowParsingReader#sample attaches the InputEntity (but not the
"record number").
TextReader (and its subclasses), which is a specific implementation of the
IntermediateRowParsingReader also include the line number which caused the
generation of the error.
This will also help in triaging the issues when InputSourceReader generates
ParseException because it can point to the specific InputEntity which caused
the exception (while trying to read it).
---
.../data/input/IntermediateRowParsingReader.java | 199 +++++++++++++++++----
.../org/apache/druid/data/input/TextReader.java | 25 ++-
.../apache/druid/data/input/impl/JsonReader.java | 6 +
.../parsers/CloseableIteratorWithMetadata.java | 81 +++++++++
.../druid/data/input/avro/AvroOCFReader.java | 6 +
.../druid/data/input/avro/AvroStreamReader.java | 7 +
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 10 +-
.../indexing/kinesis/KinesisIndexTaskTest.java | 12 +-
.../org/apache/druid/data/input/orc/OrcReader.java | 6 +
.../druid/data/input/parquet/ParquetReader.java | 6 +
.../druid/data/input/protobuf/ProtobufReader.java | 6 +
.../druid/indexing/input/DruidSegmentReader.java | 6 +
.../druid/indexing/common/task/IndexTaskTest.java | 161 +++++++++++------
.../parallel/SinglePhaseParallelIndexingTest.java | 12 +-
.../overlord/sampler/InputSourceSamplerTest.java | 45 +++--
.../org/apache/druid/metadata/input/SqlReader.java | 6 +
16 files changed, 479 insertions(+), 115 deletions(-)
diff --git
a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
index 935c985..de0b58a 100644
---
a/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
+++
b/core/src/main/java/org/apache/druid/data/input/IntermediateRowParsingReader.java
@@ -19,10 +19,15 @@
package org.apache.druid.data.input;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.utils.CollectionUtils;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
@@ -41,7 +46,7 @@ public abstract class IntermediateRowParsingReader<T>
implements InputEntityRead
@Override
public CloseableIterator<InputRow> read() throws IOException
{
- final CloseableIterator<T> intermediateRowIterator =
intermediateRowIterator();
+ final CloseableIteratorWithMetadata<T> intermediateRowIteratorWithMetadata
= intermediateRowIteratorWithMetadata();
return new CloseableIterator<InputRow>()
{
@@ -52,28 +57,42 @@ public abstract class IntermediateRowParsingReader<T>
implements InputEntityRead
// good idea. Subclasses could implement read() with some duplicate
codes to avoid unnecessary iteration on
// a singleton list.
Iterator<InputRow> rows = null;
+ long currentRecordNumber = 1;
@Override
public boolean hasNext()
{
if (rows == null || !rows.hasNext()) {
- if (!intermediateRowIterator.hasNext()) {
+ if (!intermediateRowIteratorWithMetadata.hasNext()) {
return false;
}
- final T row = intermediateRowIterator.next();
+ final T row = intermediateRowIteratorWithMetadata.next();
try {
rows = parseInputRows(row).iterator();
+ ++currentRecordNumber;
}
catch (IOException e) {
+ final Map<String, Object> metadata =
intermediateRowIteratorWithMetadata.currentMetadata();
rows = new ExceptionThrowingIterator(new ParseException(
String.valueOf(row),
e,
- "Unable to parse row [%s]",
- row
+ buildParseExceptionMessage(
+ StringUtils.format("Unable to parse row [%s]", row),
+ source(),
+ currentRecordNumber,
+ metadata
+ )
));
}
catch (ParseException e) {
- rows = new ExceptionThrowingIterator(e);
+ final Map<String, Object> metadata =
intermediateRowIteratorWithMetadata.currentMetadata();
+ // Replace the message of the ParseException e
+ rows = new ExceptionThrowingIterator(
+ new ParseException(
+ e.getInput(),
+ e.isFromPartiallyValidRow(),
+ buildParseExceptionMessage(e.getMessage(), source(),
currentRecordNumber, metadata)
+ ));
}
}
@@ -93,7 +112,7 @@ public abstract class IntermediateRowParsingReader<T>
implements InputEntityRead
@Override
public void close() throws IOException
{
- intermediateRowIterator.close();
+ intermediateRowIteratorWithMetadata.close();
}
};
}
@@ -101,43 +120,128 @@ public abstract class IntermediateRowParsingReader<T>
implements InputEntityRead
@Override
public CloseableIterator<InputRowListPlusRawValues> sample() throws
IOException
{
- return intermediateRowIterator().map(row -> {
- final List<Map<String, Object>> rawColumnsList;
- try {
- rawColumnsList = toMap(row);
- }
- catch (Exception e) {
- return InputRowListPlusRawValues.of(null,
- new
ParseException(String.valueOf(row), e, "Unable to parse row [%s] into JSON",
row));
- }
+ final CloseableIteratorWithMetadata<T> delegate =
intermediateRowIteratorWithMetadata();
- if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
- return InputRowListPlusRawValues.of(null,
- new
ParseException(String.valueOf(row), "No map object parsed for row [%s]", row));
+ return new CloseableIterator<InputRowListPlusRawValues>()
+ {
+ @Override
+ public void close() throws IOException
+ {
+ delegate.close();
}
- List<InputRow> rows;
- try {
- rows = parseInputRows(row);
- }
- catch (ParseException e) {
- return InputRowListPlusRawValues.ofList(rawColumnsList, e);
+ @Override
+ public boolean hasNext()
+ {
+ return delegate.hasNext();
}
- catch (IOException e) {
- ParseException exception = new ParseException(String.valueOf(row), e,
"Unable to parse row [%s] into inputRow", row);
- return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+
+ @Override
+ public InputRowListPlusRawValues next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ return sampleIntermediateRow(delegate.next(),
delegate.currentMetadata());
}
+ };
+ }
+
+ /**
+ * Parses and samples the intermediate row and returns input row and the raw
values in it. Metadata supplied can
+ * contain information about the source which will get surfaced in case an
exception occurs while parsing the
+ * intermediate row
+ *
+ * @param row intermediate row
+ * @param metadata additional information about the source and the record
getting parsed
+ * @return sampled data from the intermediate row
+ */
+ private InputRowListPlusRawValues sampleIntermediateRow(T row, Map<String,
Object> metadata)
+ {
+
+ final List<Map<String, Object>> rawColumnsList;
+ try {
+ rawColumnsList = toMap(row);
+ }
+ catch (Exception e) {
+ return InputRowListPlusRawValues.of(
+ null,
+ new ParseException(String.valueOf(row), e,
buildParseExceptionMessage(
+ StringUtils.nonStrictFormat("Unable to parse row [%s] into
JSON", row),
+ source(),
+ null,
+ metadata
+ ))
+ );
+ }
- return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
- });
+ if (CollectionUtils.isNullOrEmpty(rawColumnsList)) {
+ return InputRowListPlusRawValues.of(
+ null,
+ new ParseException(String.valueOf(row), buildParseExceptionMessage(
+ StringUtils.nonStrictFormat("No map object parsed for row [%s]",
row),
+ source(),
+ null,
+ metadata
+ ))
+ );
+ }
+
+ List<InputRow> rows;
+ try {
+ rows = parseInputRows(row);
+ }
+ catch (ParseException e) {
+ return InputRowListPlusRawValues.ofList(rawColumnsList, new
ParseException(
+ String.valueOf(row),
+ e,
+ buildParseExceptionMessage(e.getMessage(), source(), null, metadata)
+ ));
+ }
+ catch (IOException e) {
+ ParseException exception = new ParseException(String.valueOf(row), e,
buildParseExceptionMessage(
+ StringUtils.nonStrictFormat("Unable to parse row [%s] into
inputRow", row),
+ source(),
+ null,
+ metadata
+ ));
+ return InputRowListPlusRawValues.ofList(rawColumnsList, exception);
+ }
+
+ return InputRowListPlusRawValues.ofList(rawColumnsList, rows);
}
/**
* Creates an iterator of intermediate rows. The returned rows will be
consumed by {@link #parseInputRows} and
- * {@link #toMap}.
+ * {@link #toMap}. Either this or {@link
#intermediateRowIteratorWithMetadata()} should be implemented
+ */
+ protected CloseableIterator<T> intermediateRowIterator() throws IOException
+ {
+ throw new UOE("intermediateRowIterator not implemented");
+ }
+
+ /**
+ * Same as {@code intermediateRowIterator}, but it also contains the
metadata such as the line number to generate
+ * more informative {@link ParseException}.
*/
- protected abstract CloseableIterator<T> intermediateRowIterator() throws
IOException;
+ protected CloseableIteratorWithMetadata<T>
intermediateRowIteratorWithMetadata() throws IOException
+ {
+ return
CloseableIteratorWithMetadata.withEmptyMetadata(intermediateRowIterator());
+ }
+
+ /**
+ * @return InputEntity which the implementation is reading from. Useful in
generating informative {@link ParseException}s.
+ * For example, in case of {@link
org.apache.druid.data.input.impl.FileEntity}, file name containing erroneous
records
+ * or in case of {@link org.apache.druid.data.input.impl.HttpEntity}, the
endpoint containing the erroneous data can
+ * be attached to the error message
+ */
+ @Nullable
+ protected InputEntity source()
+ {
+ return null;
+ }
/**
* Parses the given intermediate row into a list of {@link InputRow}s.
@@ -155,6 +259,37 @@ public abstract class IntermediateRowParsingReader<T>
implements InputEntityRead
*/
protected abstract List<Map<String, Object>> toMap(T intermediateRow) throws
IOException;
+ /**
+ * A helper method which enriches the base parse exception message with
additional information. The returned message
+ * has a format: "baseExceptionMessage (key1: value1, key2: value2)" if
additional properties are present. Else it
+ * returns the baseException message without any modification
+ */
+ private static String buildParseExceptionMessage(
+ @Nonnull String baseExceptionMessage,
+ @Nullable InputEntity source,
+ @Nullable Long recordNumber,
+ @Nullable Map<String, Object> metadata
+ )
+ {
+ StringBuilder sb = new StringBuilder();
+ if (source != null && source.getUri() != null) {
+ sb.append(StringUtils.format("Path: %s, ", source.getUri()));
+ }
+ if (recordNumber != null) {
+ sb.append(StringUtils.format("Record: %d, ", recordNumber));
+ }
+ if (metadata != null) {
+ metadata.entrySet().stream()
+ .map(entry -> StringUtils.format("%s: %s, ", entry.getKey(),
entry.getValue().toString()))
+ .forEach(sb::append);
+ }
+ if (sb.length() == 0) {
+ return baseExceptionMessage;
+ }
+ sb.setLength(sb.length() - 2); // Erase the last stray ", "
+ return baseExceptionMessage + " (" + sb + ")"; // Wrap extra information
in a bracket before returning
+ }
+
private static class ExceptionThrowingIterator implements
CloseableIterator<InputRow>
{
private final Exception exception;
diff --git a/core/src/main/java/org/apache/druid/data/input/TextReader.java
b/core/src/main/java/org/apache/druid/data/input/TextReader.java
index 7214d32..e6f541c 100644
--- a/core/src/main/java/org/apache/druid/data/input/TextReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/TextReader.java
@@ -22,14 +22,16 @@ package org.apache.druid.data.input;
import com.google.common.base.Strings;
import org.apache.commons.io.LineIterator;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.CloseableIteratorWithMetadata;
import org.apache.druid.java.util.common.parsers.ParseException;
import org.apache.druid.java.util.common.parsers.ParserUtils;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
/**
* Abstract {@link InputEntityReader} for text format readers such as CSV or
JSON.
@@ -51,8 +53,7 @@ public abstract class TextReader extends
IntermediateRowParsingReader<String>
}
@Override
- public CloseableIterator<String> intermediateRowIterator()
- throws IOException
+ public CloseableIteratorWithMetadata<String>
intermediateRowIteratorWithMetadata() throws IOException
{
final LineIterator delegate = new LineIterator(
new InputStreamReader(source.open(), StringUtils.UTF8_STRING)
@@ -65,8 +66,17 @@ public abstract class TextReader extends
IntermediateRowParsingReader<String>
processHeaderLine(delegate.nextLine());
}
- return new CloseableIterator<String>()
+ return new CloseableIteratorWithMetadata<String>()
{
+ private static final String LINE_KEY = "Line";
+ private long currentLineNumber = numHeaderLines +
(needsToProcessHeaderLine() ? 1 : 0);
+
+ @Override
+ public Map<String, Object> currentMetadata()
+ {
+ return Collections.singletonMap(LINE_KEY, currentLineNumber);
+ }
+
@Override
public boolean hasNext()
{
@@ -76,6 +86,7 @@ public abstract class TextReader extends
IntermediateRowParsingReader<String>
@Override
public String next()
{
+ currentLineNumber++;
return delegate.nextLine();
}
@@ -87,6 +98,12 @@ public abstract class TextReader extends
IntermediateRowParsingReader<String>
};
}
+ @Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
/**
* Parses the given line into a list of {@link InputRow}s. Note that some
file formats can explode a single line of
* input into multiple inputRows.
diff --git
a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
index 0437103..8d0f667 100644
--- a/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
+++ b/core/src/main/java/org/apache/druid/data/input/impl/JsonReader.java
@@ -91,6 +91,12 @@ public class JsonReader extends
IntermediateRowParsingReader<String>
}
@Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
+ @Override
protected List<InputRow> parseInputRows(String intermediateRow) throws
IOException, ParseException
{
final List<InputRow> inputRows;
diff --git
a/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
new file mode 100644
index 0000000..e1e4895
--- /dev/null
+++
b/core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIteratorWithMetadata.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.java.util.common.parsers;
+
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Like {@link CloseableIterator}, but has a currentMetadata() method, which
returns "metadata", which is effectively a Map<String, Object>
+ * about the source of last value returned by next()
+ *
+ * The returned metadata is read-only and cannot be modified.
+ *
+ * This metadata can be used as additional information to pin-point the root
cause of a parse exception.
+ * So it can include information that helps with such exercise. For example,
for a {@link org.apache.druid.data.input.TextReader}
+ * that information can be the line number. Only per row context needs to be
passed here so for kafka it could be an offset.
+ * The source information is already available via {@link
IntermediateRowParsingReader#source()} method and needn't be included
+ */
+public interface CloseableIteratorWithMetadata<T> extends CloseableIterator<T>
+{
+
+ /**
+ * @return A map containing the information about the source of the last
value returned by {@link #next()}
+ */
+ Map<String, Object> currentMetadata();
+
+ /**
+ * Creates an instance of CloseableIteratorWithMetadata from a {@link
CloseableIterator}. {@link #currentMetadata()}
+ * for the instance is guaranteed to return an empty map
+ */
+ static <T> CloseableIteratorWithMetadata<T>
withEmptyMetadata(CloseableIterator<T> delegate)
+ {
+ return new CloseableIteratorWithMetadata<T>()
+ {
+
+ @Override
+ public Map<String, Object> currentMetadata()
+ {
+ return Collections.emptyMap();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ delegate.close();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return delegate.hasNext();
+ }
+
+ @Override
+ public T next()
+ {
+ return delegate.next();
+ }
+ };
+ }
+}
diff --git
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
index 09888f7..66552f9 100644
---
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
+++
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroOCFReader.java
@@ -112,6 +112,12 @@ public class AvroOCFReader extends
IntermediateRowParsingReader<GenericRecord>
}
@Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
+ @Override
protected List<InputRow> parseInputRows(GenericRecord intermediateRow)
throws ParseException
{
return Collections.singletonList(
diff --git
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
index 50da0e5..4ed6a8a 100644
---
a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
+++
b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroStreamReader.java
@@ -72,6 +72,13 @@ public class AvroStreamReader extends
IntermediateRowParsingReader<GenericRecord
}
@Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
+
+ @Override
protected List<InputRow> parseInputRows(GenericRecord intermediateRow)
throws ParseException
{
return Collections.singletonList(
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 629358f..94c0106 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -1579,9 +1579,9 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
"Unable to parse value[notanumber] for field[met1]",
"could not convert value [notanumber] to float",
"could not convert value [notanumber] to long",
- "Unable to parse [] as the intermediateRow resulted in empty input
row",
- "Unable to parse row [unparseable]",
- "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that
cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z,
dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
+ "Unable to parse [] as the intermediateRow resulted in empty input row
(Record: 1)",
+ "Unable to parse row [unparseable] (Record: 1)",
+ "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that
cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z,
dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}] (Record: 1)"
);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
@@ -1665,8 +1665,8 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
.get(RowIngestionMeters.BUILD_SEGMENTS);
List<String> expectedMessages = Arrays.asList(
- "Unable to parse [] as the intermediateRow resulted in empty input
row",
- "Unable to parse row [unparseable]"
+ "Unable to parse [] as the intermediateRow resulted in empty input row
(Record: 1)",
+ "Unable to parse row [unparseable] (Record: 1)"
);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index 8a90862..db7c927 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -1351,10 +1351,10 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
"Unable to parse value[notanumber] for field[met1]",
"could not convert value [notanumber] to float",
"could not convert value [notanumber] to long",
- "Timestamp[null] is unparseable! Event: {}",
- "Unable to parse [] as the intermediateRow resulted in empty input
row",
- "Unable to parse row [unparseable]",
- "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that
cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z,
dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}]"
+ "Timestamp[null] is unparseable! Event: {} (Record: 1)",
+ "Unable to parse [] as the intermediateRow resulted in empty input row
(Record: 1)",
+ "Unable to parse row [unparseable] (Record: 1)",
+ "Encountered row with timestamp[246140482-04-24T15:36:27.903Z] that
cannot be represented as a long: [{timestamp=246140482-04-24T15:36:27.903Z,
dim1=x, dim2=z, dimLong=10, dimFloat=20.0, met1=1.0}] (Record: 1)"
);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
@@ -1457,8 +1457,8 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
.get(RowIngestionMeters.BUILD_SEGMENTS);
List<String> expectedMessages = Arrays.asList(
- "Unable to parse [] as the intermediateRow resulted in empty input
row",
- "Unable to parse row [unparseable]"
+ "Unable to parse [] as the intermediateRow resulted in empty input row
(Record: 1)",
+ "Unable to parse row [unparseable] (Record: 1)"
);
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
diff --git
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
index ca10988..5bcec8c 100644
---
a/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
+++
b/extensions-core/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcReader.java
@@ -144,6 +144,12 @@ public class OrcReader extends
IntermediateRowParsingReader<OrcStruct>
}
@Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
+ @Override
protected List<InputRow> parseInputRows(OrcStruct intermediateRow) throws
ParseException
{
return Collections.singletonList(
diff --git
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
index bbd45f6..aced110 100644
---
a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
+++
b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/ParquetReader.java
@@ -133,6 +133,12 @@ public class ParquetReader extends
IntermediateRowParsingReader<Group>
}
@Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
+ @Override
protected List<InputRow> parseInputRows(Group intermediateRow) throws
ParseException
{
return Collections.singletonList(
diff --git
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
index e24579f..d512d10 100644
---
a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
+++
b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufReader.java
@@ -85,6 +85,12 @@ public class ProtobufReader extends
IntermediateRowParsingReader<DynamicMessage>
}
@Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
+ @Override
protected List<InputRow> parseInputRows(DynamicMessage intermediateRow)
throws ParseException, JsonProcessingException
{
Map<String, Object> record;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
index 322526b..7117eea 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidSegmentReader.java
@@ -142,6 +142,12 @@ public class DruidSegmentReader extends
IntermediateRowParsingReader<Map<String,
}
@Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
+ @Override
protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow)
throws ParseException
{
return Collections.singletonList(MapInputRowParser.parse(inputRowSchema,
intermediateRow));
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index b714947..9168f53 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -1323,6 +1323,7 @@ public class IndexTaskTest extends IngestionTestBase
// report parse exception
final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null,
null, null, false, true);
final IndexIngestionSpec indexIngestionSpec;
+ List<String> expectedMessages;
if (useInputFormatApi) {
indexIngestionSpec = createIngestionSpec(
jsonMapper,
@@ -1336,6 +1337,12 @@ public class IndexTaskTest extends IngestionTestBase
false,
false
);
+ expectedMessages = ImmutableList.of(
+ StringUtils.format(
+ "Timestamp[unparseable] is unparseable! Event:
{time=unparseable, d=a, val=1} (Path: %s, Record: 1, Line: 2)",
+ tmpFile.toURI()
+ )
+ );
} else {
indexIngestionSpec = createIngestionSpec(
jsonMapper,
@@ -1347,6 +1354,9 @@ public class IndexTaskTest extends IngestionTestBase
false,
false
);
+ expectedMessages = ImmutableList.of(
+ "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
d=a, val=1}"
+ );
}
IndexTask indexTask = new IndexTask(
@@ -1366,9 +1376,6 @@ public class IndexTaskTest extends IngestionTestBase
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
- List<String> expectedMessages = ImmutableList.of(
- "Timestamp[unparseable] is unparseable! Event: {time=unparseable, d=a,
val=1}"
- );
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
@@ -1500,39 +1507,42 @@ public class IndexTaskTest extends IngestionTestBase
);
Assert.assertEquals(expectedMetrics, reportData.getRowStats());
- Map<String, Object> expectedUnparseables = ImmutableMap.of(
- RowIngestionMeters.DETERMINE_PARTITIONS,
- Arrays.asList(
- "Unable to parse row [this is not JSON]",
- "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
- "Unable to parse row
[{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
- "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
- ),
- RowIngestionMeters.BUILD_SEGMENTS,
- Arrays.asList(
- "Unable to parse row [this is not JSON]",
- "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
- "Unable to parse row
[{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
- "Found unparseable columns in row:
[MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z,
event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=4.0,
val=notnumber}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [Unable to
parse value[notnumber] for field[val]]",
- "Found unparseable columns in row:
[MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z,
event={time=2014-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=notnumber, val=1},
dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert value
[notnumber] to float]",
- "Found unparseable columns in row:
[MapBasedInputRow{timestamp=2014-01-01T00:00:10.000Z,
event={time=2014-01-01T00:00:10Z, dim=b, dimLong=notnumber, dimFloat=3.0,
val=1}, dimensions=[dim, dimLong, dimFloat]}], exceptions: [could not convert
value [notnumber] to long]",
- "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
- )
- );
-
List<LinkedHashMap> parseExceptionReports = (List<LinkedHashMap>)
reportData
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
- List<String> expectedMessages = Arrays.asList(
- "Unable to parse row [this is not JSON]",
- "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
- "Unable to parse row
[{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
- "Unable to parse value[notnumber] for field[val]",
- "could not convert value [notnumber] to float",
- "could not convert value [notnumber] to long",
- "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
- );
+ List<String> expectedMessages;
+ if (useInputFormatApi) {
+ expectedMessages = Arrays.asList(
+ StringUtils.format("Unable to parse row [this is not JSON] (Path:
%s, Record: 6, Line: 9)", tmpFile.toURI()),
+ StringUtils.format(
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}
(Path: %s, Record: 6, Line: 8)",
+ tmpFile.toURI()
+ ),
+ StringUtils.format(
+ "Unable to parse row
[{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}] (Path:
%s, Record: 5, Line: 6)",
+ tmpFile.toURI()
+ ),
+ "Unable to parse value[notnumber] for field[val]",
+ "could not convert value [notnumber] to float",
+ "could not convert value [notnumber] to long",
+ StringUtils.format(
+ "Timestamp[unparseable] is unparseable! Event:
{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1,
Line: 1)",
+ tmpFile.toURI()
+ )
+ );
+ } else {
+ expectedMessages = Arrays.asList(
+ "Unable to parse row [this is not JSON]",
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+ "Unable to parse row
[{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
+ "Unable to parse value[notnumber] for field[val]",
+ "could not convert value [notnumber] to float",
+ "could not convert value [notnumber] to long",
+ "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
+ );
+ }
+
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
@@ -1556,12 +1566,31 @@ public class IndexTaskTest extends IngestionTestBase
.getUnparseableEvents()
.get(RowIngestionMeters.DETERMINE_PARTITIONS);
- expectedMessages = Arrays.asList(
- "Unable to parse row [this is not JSON]",
- "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
- "Unable to parse row
[{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
- "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
- );
+ if (useInputFormatApi) {
+ expectedMessages = Arrays.asList(
+ StringUtils.format("Unable to parse row [this is not JSON] (Path:
%s, Record: 6, Line: 9)", tmpFile.toURI()),
+ StringUtils.format(
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}
(Path: %s, Record: 6, Line: 8)",
+ tmpFile.toURI()
+ ),
+ StringUtils.format(
+ "Unable to parse row
[{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}] (Path:
%s, Record: 5, Line: 6)",
+ tmpFile.toURI()
+ ),
+ StringUtils.format(
+ "Timestamp[unparseable] is unparseable! Event:
{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1,
Line: 1)",
+ tmpFile.toURI()
+ )
+ );
+ } else {
+ expectedMessages = Arrays.asList(
+ "Unable to parse row [this is not JSON]",
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+ "Unable to parse row
[{\"time\":9.0x,\"dim\":\"a\",\"dimLong\":2,\"dimFloat\":3.0,\"val\":1}]",
+ "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
+ );
+ }
+
actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
@@ -1634,6 +1663,8 @@ public class IndexTaskTest extends IngestionTestBase
);
final List<String> columns = Arrays.asList("time", "dim", "dimLong",
"dimFloat", "val");
final IndexIngestionSpec ingestionSpec;
+
+ List<String> expectedMessages;
if (useInputFormatApi) {
ingestionSpec = createIngestionSpec(
jsonMapper,
@@ -1647,6 +1678,20 @@ public class IndexTaskTest extends IngestionTestBase
false,
false
);
+ expectedMessages = Arrays.asList(
+ StringUtils.format(
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}
(Path: %s, Record: 3, Line: 6)",
+ tmpFile.toURI()
+ ),
+ StringUtils.format(
+ "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a,
dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 2, Line: 4)",
+ tmpFile.toURI()
+ ),
+ StringUtils.format(
+ "Timestamp[unparseable] is unparseable! Event:
{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1,
Line: 2)",
+ tmpFile.toURI()
+ )
+ );
} else {
ingestionSpec = createIngestionSpec(
jsonMapper,
@@ -1658,6 +1703,11 @@ public class IndexTaskTest extends IngestionTestBase
false,
false
);
+ expectedMessages = Arrays.asList(
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+ "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2,
dimFloat=3.0, val=1}",
+ "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
+ );
}
IndexTask indexTask = new IndexTask(
@@ -1696,11 +1746,6 @@ public class IndexTaskTest extends IngestionTestBase
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
- List<String> expectedMessages = Arrays.asList(
- "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
- "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2,
dimFloat=3.0, val=1}",
- "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
- );
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
@@ -1771,6 +1816,8 @@ public class IndexTaskTest extends IngestionTestBase
);
final List<String> columns = Arrays.asList("time", "dim", "dimLong",
"dimFloat", "val");
final IndexIngestionSpec ingestionSpec;
+
+ List<String> expectedMessages;
if (useInputFormatApi) {
ingestionSpec = createIngestionSpec(
jsonMapper,
@@ -1784,6 +1831,11 @@ public class IndexTaskTest extends IngestionTestBase
false,
false
);
+ expectedMessages = Arrays.asList(
+ StringUtils.format("Timestamp[99999999999-01-01T00:00:10Z] is
unparseable! Event: {time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2,
dimFloat=3.0, val=1} (Path: %s, Record: 3, Line: 6)", tmpFile.toURI()),
+ StringUtils.format("Timestamp[9.0] is unparseable! Event: {time=9.0,
dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 2, Line: 4)",
tmpFile.toURI()),
+ StringUtils.format("Timestamp[unparseable] is unparseable! Event:
{time=unparseable, dim=a, dimLong=2, dimFloat=3.0, val=1} (Path: %s, Record: 1,
Line: 2)", tmpFile.toURI())
+ );
} else {
ingestionSpec = createIngestionSpec(
jsonMapper,
@@ -1795,6 +1847,11 @@ public class IndexTaskTest extends IngestionTestBase
false,
false
);
+ expectedMessages = Arrays.asList(
+ "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
+ "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2,
dimFloat=3.0, val=1}",
+ "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
+ );
}
IndexTask indexTask = new IndexTask(
@@ -1833,11 +1890,6 @@ public class IndexTaskTest extends IngestionTestBase
.getUnparseableEvents()
.get(RowIngestionMeters.DETERMINE_PARTITIONS);
- List<String> expectedMessages = Arrays.asList(
- "Timestamp[99999999999-01-01T00:00:10Z] is unparseable! Event:
{time=99999999999-01-01T00:00:10Z, dim=b, dimLong=2, dimFloat=3.0, val=1}",
- "Timestamp[9.0] is unparseable! Event: {time=9.0, dim=a, dimLong=2,
dimFloat=3.0, val=1}",
- "Timestamp[unparseable] is unparseable! Event: {time=unparseable,
dim=a, dimLong=2, dimFloat=3.0, val=1}"
- );
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
@@ -1957,6 +2009,7 @@ public class IndexTaskTest extends IngestionTestBase
// report parse exception
final IndexTuningConfig tuningConfig = createTuningConfig(2, null, null,
null, null, false, true);
final IndexIngestionSpec ingestionSpec;
+ List<String> expectedMessages;
if (useInputFormatApi) {
ingestionSpec = createIngestionSpec(
jsonMapper,
@@ -1970,6 +2023,12 @@ public class IndexTaskTest extends IngestionTestBase
false,
false
);
+ expectedMessages = ImmutableList.of(
+ StringUtils.format(
+ "Timestamp[null] is unparseable! Event:
{column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1} (Path: %s, Record: 1,
Line: 2)",
+ tmpFile.toURI()
+ )
+ );
} else {
ingestionSpec = createIngestionSpec(
jsonMapper,
@@ -1981,6 +2040,9 @@ public class IndexTaskTest extends IngestionTestBase
false,
false
);
+ expectedMessages = ImmutableList.of(
+ "Timestamp[null] is unparseable! Event:
{column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}"
+ );
}
IndexTask indexTask = new IndexTask(
@@ -2001,9 +2063,6 @@ public class IndexTaskTest extends IngestionTestBase
.getUnparseableEvents()
.get(RowIngestionMeters.BUILD_SEGMENTS);
- List<String> expectedMessages = ImmutableList.of(
- "Timestamp[null] is unparseable! Event:
{column_1=2014-01-01T00:00:10Z, column_2=a, column_3=1}"
- );
List<String> actualMessages = parseExceptionReports.stream().map((r) -> {
return ((List<String>) r.get("details")).get(0);
}).collect(Collectors.toList());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 91ea632..b57e55b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -341,7 +341,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
new ParseExceptionReport(
"{ts=2017unparseable}",
"unparseable",
- ImmutableList.of("Timestamp[2017unparseable] is unparseable!
Event: {ts=2017unparseable}"),
+ ImmutableList.of(getErrorMessageForUnparseableTimestamp()),
1L
),
new ParseExceptionReport(
@@ -462,7 +462,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
new ParseExceptionReport(
"{ts=2017unparseable}",
"unparseable",
- ImmutableList.of("Timestamp[2017unparseable] is unparseable!
Event: {ts=2017unparseable}"),
+ ImmutableList.of(getErrorMessageForUnparseableTimestamp()),
1L
),
new ParseExceptionReport(
@@ -989,6 +989,14 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
);
}
+ private String getErrorMessageForUnparseableTimestamp()
+ {
+ return useInputFormatApi ? StringUtils.format(
+ "Timestamp[2017unparseable] is unparseable! Event:
{ts=2017unparseable} (Path: %s, Record: 5, Line: 5)",
+ new File(inputDir, "test_0").toURI()
+ ) : "Timestamp[2017unparseable] is unparseable! Event:
{ts=2017unparseable}";
+ }
+
private static class SettableSplittableLocalInputSource extends
LocalInputSource
{
private final boolean splittableInputSource;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index b8725b1..89a4642 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -186,7 +186,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
getRawColumns().get(0),
null,
true,
- unparseableTimestampErrorString(data.get(0).getInput())
+ unparseableTimestampErrorString(data.get(0).getInput(), 1)
),
data.get(0)
);
@@ -195,7 +195,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
getRawColumns().get(1),
null,
true,
- unparseableTimestampErrorString(data.get(1).getInput())
+ unparseableTimestampErrorString(data.get(1).getInput(), 2)
),
data.get(1)
);
@@ -204,7 +204,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
getRawColumns().get(2),
null,
true,
- unparseableTimestampErrorString(data.get(2).getInput())
+ unparseableTimestampErrorString(data.get(2).getInput(), 3)
),
data.get(2)
);
@@ -213,7 +213,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
getRawColumns().get(3),
null,
true,
- unparseableTimestampErrorString(data.get(3).getInput())
+ unparseableTimestampErrorString(data.get(3).getInput(), 4)
),
data.get(3)
);
@@ -222,7 +222,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
getRawColumns().get(4),
null,
true,
- unparseableTimestampErrorString(data.get(4).getInput())
+ unparseableTimestampErrorString(data.get(4).getInput(), 5)
),
data.get(4)
);
@@ -231,7 +231,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
getRawColumns().get(5),
null,
true,
- unparseableTimestampErrorString(data.get(5).getInput())
+ unparseableTimestampErrorString(data.get(5).getInput(), 6)
),
data.get(5)
);
@@ -259,7 +259,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
getRawColumns().get(0),
null,
true,
- unparseableTimestampErrorString(data.get(0).getInput())
+ unparseableTimestampErrorString(data.get(0).getInput(), 1)
),
data.get(0)
);
@@ -268,7 +268,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
getRawColumns().get(1),
null,
true,
- unparseableTimestampErrorString(data.get(1).getInput())
+ unparseableTimestampErrorString(data.get(1).getInput(), 2)
),
data.get(1)
);
@@ -277,7 +277,7 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
getRawColumns().get(2),
null,
true,
- unparseableTimestampErrorString(data.get(2).getInput())
+ unparseableTimestampErrorString(data.get(2).getInput(), 3)
),
data.get(2)
);
@@ -1248,7 +1248,12 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
//
// first n rows are related to the first json block which fails to parse
//
- String parseExceptionMessage = "Timestamp[bad_timestamp] is unparseable!
Event: {t=bad_timestamp, dim1=foo, met1=6}";
+ String parseExceptionMessage;
+ if (useInputFormatApi) {
+ parseExceptionMessage = "Timestamp[bad_timestamp] is unparseable! Event:
{t=bad_timestamp, dim1=foo, met1=6}";
+ } else {
+ parseExceptionMessage = "Timestamp[bad_timestamp] is unparseable! Event:
{t=bad_timestamp, dim1=foo, met1=6}";
+ }
for (; index < illegalRows; index++) {
assertEqualsSamplerResponseRow(
new SamplerResponseRow(
@@ -1436,14 +1441,24 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
private String getUnparseableTimestampString()
{
- return ParserType.STR_CSV.equals(parserType)
- ? "Timestamp[bad_timestamp] is unparseable! Event:
{t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
- : "Timestamp[bad_timestamp] is unparseable! Event:
{t=bad_timestamp, dim1=foo, met1=6}";
+ if (useInputFormatApi) {
+ return ParserType.STR_CSV.equals(parserType)
+ ? "Timestamp[bad_timestamp] is unparseable! Event:
{t=bad_timestamp, dim1=foo, dim2=null, met1=6} (Line: 6)"
+ : "Timestamp[bad_timestamp] is unparseable! Event:
{t=bad_timestamp, dim1=foo, met1=6} (Line: 6)";
+ } else {
+ return ParserType.STR_CSV.equals(parserType)
+ ? "Timestamp[bad_timestamp] is unparseable! Event:
{t=bad_timestamp, dim1=foo, dim2=null, met1=6}"
+ : "Timestamp[bad_timestamp] is unparseable! Event:
{t=bad_timestamp, dim1=foo, met1=6}";
+ }
}
- private String unparseableTimestampErrorString(Map<String, Object>
rawColumns)
+ private String unparseableTimestampErrorString(Map<String, Object>
rawColumns, int line)
{
- return StringUtils.format("Timestamp[null] is unparseable! Event: %s",
rawColumns);
+ if (useInputFormatApi) {
+ return StringUtils.format("Timestamp[null] is unparseable! Event: %s
(Line: %d)", rawColumns, line);
+ } else {
+ return StringUtils.format("Timestamp[null] is unparseable! Event: %s",
rawColumns);
+ }
}
@Nullable
diff --git
a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
index 0709799..0a503eb 100644
--- a/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
+++ b/server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
@@ -76,6 +76,12 @@ public class SqlReader extends
IntermediateRowParsingReader<Map<String, Object>>
}
@Override
+ protected InputEntity source()
+ {
+ return source;
+ }
+
+ @Override
protected List<InputRow> parseInputRows(Map<String, Object> intermediateRow)
throws ParseException
{
return Collections.singletonList(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]