This is an automated email from the ASF dual-hosted git repository.
suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 5d8a69e fixed input source sampler buildReader exp (#10651)
5d8a69e is described below
commit 5d8a69ecf9fa1bbe212bc1ee433092be0425f3b6
Author: Egor Riashin <[email protected]>
AuthorDate: Sat Feb 6 00:33:55 2021 +0300
fixed input source sampler buildReader exp (#10651)
* fixed input source sampler buildReader exp
* sampler exception unit test
* styling
Co-authored-by: egor-ryashin <[email protected]>
---
.../overlord/sampler/InputSourceSampler.java | 156 +++++++++++----------
.../overlord/sampler/InputSourceSamplerTest.java | 33 +++++
2 files changed, 117 insertions(+), 72 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 2831734..4342460 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -105,90 +105,102 @@ public class InputSourceSampler
final File tempDir = FileUtils.createTempDir();
closer.register(() -> FileUtils.deleteDirectory(tempDir));
- final InputSourceReader reader = buildReader(
- nonNullSamplerConfig,
- nonNullDataSchema,
- inputSource,
- inputFormat,
- tempDir
- );
- try (final CloseableIterator<InputRowListPlusRawValues> iterator =
reader.sample();
- final IncrementalIndex<Aggregator> index =
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
- final Closer closer1 = closer) {
- List<SamplerResponseRow> responseRows = new
ArrayList<>(nonNullSamplerConfig.getNumRows());
- int numRowsIndexed = 0;
-
- while (responseRows.size() < nonNullSamplerConfig.getNumRows() &&
iterator.hasNext()) {
- final InputRowListPlusRawValues inputRowListPlusRawValues =
iterator.next();
-
- final List<Map<String, Object>> rawColumnsList =
inputRowListPlusRawValues.getRawValuesList();
-
- final ParseException parseException =
inputRowListPlusRawValues.getParseException();
- if (parseException != null) {
- if (rawColumnsList != null) {
- // add all rows to response
- responseRows.addAll(rawColumnsList.stream()
- .map(rawColumns -> new
SamplerResponseRow(rawColumns, null, true, parseException.getMessage()))
- .collect(Collectors.toList()));
- } else {
- // no data parsed, add one response row
- responseRows.add(new SamplerResponseRow(null, null, true,
parseException.getMessage()));
+ try {
+ final InputSourceReader reader = buildReader(
+ nonNullSamplerConfig,
+ nonNullDataSchema,
+ inputSource,
+ inputFormat,
+ tempDir
+ );
+ try (final CloseableIterator<InputRowListPlusRawValues> iterator =
reader.sample();
+ final IncrementalIndex<Aggregator> index =
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
+ final Closer closer1 = closer) {
+ List<SamplerResponseRow> responseRows = new
ArrayList<>(nonNullSamplerConfig.getNumRows());
+ int numRowsIndexed = 0;
+
+ while (responseRows.size() < nonNullSamplerConfig.getNumRows() &&
iterator.hasNext()) {
+ final InputRowListPlusRawValues inputRowListPlusRawValues =
iterator.next();
+
+ final List<Map<String, Object>> rawColumnsList =
inputRowListPlusRawValues.getRawValuesList();
+
+ final ParseException parseException =
inputRowListPlusRawValues.getParseException();
+ if (parseException != null) {
+ if (rawColumnsList != null) {
+ // add all rows to response
+ responseRows.addAll(rawColumnsList.stream()
+ .map(rawColumns -> new
SamplerResponseRow(
+ rawColumns,
+ null,
+ true,
+ parseException.getMessage()
+ ))
+ .collect(Collectors.toList()));
+ } else {
+ // no data parsed, add one response row
+ responseRows.add(new SamplerResponseRow(null, null, true,
parseException.getMessage()));
+ }
+ continue;
}
- continue;
- }
- List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
- if (inputRows == null) {
- continue;
- }
+ List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
+ if (inputRows == null) {
+ continue;
+ }
- for (int i = 0; i < inputRows.size(); i++) {
- // InputRowListPlusRawValues guarantees the size of rawColumnsList
and inputRows are the same
- Map<String, Object> rawColumns = rawColumnsList == null ? null :
rawColumnsList.get(i);
- InputRow row = inputRows.get(i);
-
- //keep the index of the row to be added to responseRows for further
use
- final int rowIndex = responseRows.size();
- IncrementalIndexAddResult addResult = index.add(new
SamplerInputRow(row, rowIndex), true);
- if (addResult.hasParseException()) {
- responseRows.add(new SamplerResponseRow(rawColumns, null, true,
addResult.getParseException().getMessage()));
- } else {
- // store the raw value; will be merged with the data from the
IncrementalIndex later
- responseRows.add(new SamplerResponseRow(rawColumns, null, null,
null));
- numRowsIndexed++;
+ for (int i = 0; i < inputRows.size(); i++) {
+ // InputRowListPlusRawValues guarantees the size of rawColumnsList
and inputRows are the same
+ Map<String, Object> rawColumns = rawColumnsList == null ? null :
rawColumnsList.get(i);
+ InputRow row = inputRows.get(i);
+
+ //keep the index of the row to be added to responseRows for
further use
+ final int rowIndex = responseRows.size();
+ IncrementalIndexAddResult addResult = index.add(new
SamplerInputRow(row, rowIndex), true);
+ if (addResult.hasParseException()) {
+ responseRows.add(new SamplerResponseRow(
+ rawColumns,
+ null,
+ true,
+ addResult.getParseException().getMessage()
+ ));
+ } else {
+ // store the raw value; will be merged with the data from the
IncrementalIndex later
+ responseRows.add(new SamplerResponseRow(rawColumns, null, null,
null));
+ numRowsIndexed++;
+ }
}
}
- }
- final List<String> columnNames = index.getColumnNames();
- columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
+ final List<String> columnNames = index.getColumnNames();
+ columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
- for (Row row : index) {
- Map<String, Object> parsed = new HashMap<>();
+ for (Row row : index) {
+ Map<String, Object> parsed = new HashMap<>();
- columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
- parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
+ columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
+ parsed.put(ColumnHolder.TIME_COLUMN_NAME,
row.getTimestampFromEpoch());
- Number sortKey =
row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
- if (sortKey != null) {
- responseRows.set(sortKey.intValue(),
responseRows.get(sortKey.intValue()).withParsed(parsed));
+ Number sortKey =
row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
+ if (sortKey != null) {
+ responseRows.set(sortKey.intValue(),
responseRows.get(sortKey.intValue()).withParsed(parsed));
+ }
}
- }
- // make sure size of responseRows meets the input
- if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
- responseRows = responseRows.subList(0,
nonNullSamplerConfig.getNumRows());
- }
+ // make sure size of responseRows meets the input
+ if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
+ responseRows = responseRows.subList(0,
nonNullSamplerConfig.getNumRows());
+ }
- int numRowsRead = responseRows.size();
- return new SamplerResponse(
- numRowsRead,
- numRowsIndexed,
- responseRows.stream()
- .filter(Objects::nonNull)
- .filter(x -> x.getParsed() != null || x.isUnparseable()
!= null)
- .collect(Collectors.toList())
- );
+ int numRowsRead = responseRows.size();
+ return new SamplerResponse(
+ numRowsRead,
+ numRowsIndexed,
+ responseRows.stream()
+ .filter(Objects::nonNull)
+ .filter(x -> x.getParsed() != null ||
x.isUnparseable() != null)
+ .collect(Collectors.toList())
+ );
+ }
}
catch (Exception e) {
throw new SamplerException(e, "Failed to sample data: %s",
e.getMessage());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index 827310e..b8725b1 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -28,7 +28,9 @@ import org.apache.druid.client.indexing.SamplerResponse;
import org.apache.druid.client.indexing.SamplerResponse.SamplerResponseRow;
import org.apache.druid.data.input.FirehoseFactoryToInputSourceAdaptor;
import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.CsvInputFormat;
import org.apache.druid.data.input.impl.DelimitedParseSpec;
@@ -71,6 +73,7 @@ import org.junit.runners.Parameterized;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -1289,6 +1292,36 @@ public class InputSourceSamplerTest extends
InitializedNullHandlingTest
);
}
+ @Test(expected = SamplerException.class)
+ public void testReaderCreationException()
+ {
+ InputSource failingReaderInputSource = new InputSource()
+ {
+ @Override
+ public boolean isSplittable()
+ {
+ return false;
+ }
+
+ @Override
+ public boolean needsFormat()
+ {
+ return false;
+ }
+
+ @Override
+ public InputSourceReader reader(
+ InputRowSchema inputRowSchema,
+ @Nullable InputFormat inputFormat,
+ File temporaryDirectory
+ )
+ {
+ throw new RuntimeException();
+ }
+ };
+ inputSourceSampler.sample(failingReaderInputSource, null, null, null);
+ }
+
private List<String> getTestRows()
{
switch (parserType) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]