This is an automated email from the ASF dual-hosted git repository.
egorryashin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new e688db8 fixed input source sampler buildReader exp
e688db8 is described below
commit e688db85037acb4b3b039be30f4b51e564f38efc
Author: egor-ryashin <[email protected]>
AuthorDate: Mon Dec 7 18:28:25 2020 +0300
fixed input source sampler buildReader exp
---
.../overlord/sampler/InputSourceSampler.java | 156 +++++++++++----------
1 file changed, 84 insertions(+), 72 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 1b25279..c9984c4 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -103,90 +103,102 @@ public class InputSourceSampler
final File tempDir = FileUtils.createTempDir();
closer.register(() -> FileUtils.deleteDirectory(tempDir));
- final InputSourceReader reader = buildReader(
- nonNullSamplerConfig,
- nonNullDataSchema,
- inputSource,
- inputFormat,
- tempDir
- );
- try (final CloseableIterator<InputRowListPlusRawValues> iterator =
reader.sample();
- final IncrementalIndex<Aggregator> index =
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
- final Closer closer1 = closer) {
- List<SamplerResponseRow> responseRows = new
ArrayList<>(nonNullSamplerConfig.getNumRows());
- int numRowsIndexed = 0;
-
- while (responseRows.size() < nonNullSamplerConfig.getNumRows() &&
iterator.hasNext()) {
- final InputRowListPlusRawValues inputRowListPlusRawValues =
iterator.next();
-
- final List<Map<String, Object>> rawColumnsList =
inputRowListPlusRawValues.getRawValuesList();
-
- final ParseException parseException =
inputRowListPlusRawValues.getParseException();
- if (parseException != null) {
- if (rawColumnsList != null) {
- // add all rows to response
- responseRows.addAll(rawColumnsList.stream()
- .map(rawColumns -> new
SamplerResponseRow(rawColumns, null, true, parseException.getMessage()))
- .collect(Collectors.toList()));
- } else {
- // no data parsed, add one response row
- responseRows.add(new SamplerResponseRow(null, null, true,
parseException.getMessage()));
+ try {
+ final InputSourceReader reader = buildReader(
+ nonNullSamplerConfig,
+ nonNullDataSchema,
+ inputSource,
+ inputFormat,
+ tempDir
+ );
+ try (final CloseableIterator<InputRowListPlusRawValues> iterator =
reader.sample();
+ final IncrementalIndex<Aggregator> index =
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
+ final Closer closer1 = closer) {
+ List<SamplerResponseRow> responseRows = new
ArrayList<>(nonNullSamplerConfig.getNumRows());
+ int numRowsIndexed = 0;
+
+ while (responseRows.size() < nonNullSamplerConfig.getNumRows() &&
iterator.hasNext()) {
+ final InputRowListPlusRawValues inputRowListPlusRawValues =
iterator.next();
+
+ final List<Map<String, Object>> rawColumnsList =
inputRowListPlusRawValues.getRawValuesList();
+
+ final ParseException parseException =
inputRowListPlusRawValues.getParseException();
+ if (parseException != null) {
+ if (rawColumnsList != null) {
+ // add all rows to response
+ responseRows.addAll(rawColumnsList.stream()
+ .map(rawColumns -> new
SamplerResponseRow(
+ rawColumns,
+ null,
+ true,
+ parseException.getMessage()
+ ))
+ .collect(Collectors.toList()));
+ } else {
+ // no data parsed, add one response row
+ responseRows.add(new SamplerResponseRow(null, null, true,
parseException.getMessage()));
+ }
+ continue;
}
- continue;
- }
- List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
- if (inputRows == null) {
- continue;
- }
+ List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
+ if (inputRows == null) {
+ continue;
+ }
- for (int i = 0; i < inputRows.size(); i++) {
- // InputRowListPlusRawValues guarantees the size of rawColumnsList
and inputRows are the same
- Map<String, Object> rawColumns = rawColumnsList == null ? null :
rawColumnsList.get(i);
- InputRow row = inputRows.get(i);
-
- //keep the index of the row to be added to responseRows for further
use
- final int rowIndex = responseRows.size();
- IncrementalIndexAddResult addResult = index.add(new
SamplerInputRow(row, rowIndex), true);
- if (addResult.hasParseException()) {
- responseRows.add(new SamplerResponseRow(rawColumns, null, true,
addResult.getParseException().getMessage()));
- } else {
- // store the raw value; will be merged with the data from the
IncrementalIndex later
- responseRows.add(new SamplerResponseRow(rawColumns, null, null,
null));
- numRowsIndexed++;
+ for (int i = 0; i < inputRows.size(); i++) {
+ // InputRowListPlusRawValues guarantees the size of rawColumnsList
and inputRows are the same
+ Map<String, Object> rawColumns = rawColumnsList == null ? null :
rawColumnsList.get(i);
+ InputRow row = inputRows.get(i);
+
+ //keep the index of the row to be added to responseRows for
further use
+ final int rowIndex = responseRows.size();
+ IncrementalIndexAddResult addResult = index.add(new
SamplerInputRow(row, rowIndex), true);
+ if (addResult.hasParseException()) {
+ responseRows.add(new SamplerResponseRow(
+ rawColumns,
+ null,
+ true,
+ addResult.getParseException().getMessage()
+ ));
+ } else {
+ // store the raw value; will be merged with the data from the
IncrementalIndex later
+ responseRows.add(new SamplerResponseRow(rawColumns, null, null,
null));
+ numRowsIndexed++;
+ }
}
}
- }
- final List<String> columnNames = index.getColumnNames();
- columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
+ final List<String> columnNames = index.getColumnNames();
+ columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
- for (Row row : index) {
- Map<String, Object> parsed = new HashMap<>();
+ for (Row row : index) {
+ Map<String, Object> parsed = new HashMap<>();
- columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
- parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
+ columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
+ parsed.put(ColumnHolder.TIME_COLUMN_NAME,
row.getTimestampFromEpoch());
- Number sortKey =
row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
- if (sortKey != null) {
- responseRows.set(sortKey.intValue(),
responseRows.get(sortKey.intValue()).withParsed(parsed));
+ Number sortKey =
row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
+ if (sortKey != null) {
+ responseRows.set(sortKey.intValue(),
responseRows.get(sortKey.intValue()).withParsed(parsed));
+ }
}
- }
- // make sure size of responseRows meets the input
- if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
- responseRows = responseRows.subList(0,
nonNullSamplerConfig.getNumRows());
- }
+ // make sure size of responseRows meets the input
+ if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
+ responseRows = responseRows.subList(0,
nonNullSamplerConfig.getNumRows());
+ }
- int numRowsRead = responseRows.size();
- return new SamplerResponse(
- numRowsRead,
- numRowsIndexed,
- responseRows.stream()
- .filter(Objects::nonNull)
- .filter(x -> x.getParsed() != null || x.isUnparseable()
!= null)
- .collect(Collectors.toList())
- );
+ int numRowsRead = responseRows.size();
+ return new SamplerResponse(
+ numRowsRead,
+ numRowsIndexed,
+ responseRows.stream()
+ .filter(Objects::nonNull)
+ .filter(x -> x.getParsed() != null ||
x.isUnparseable() != null)
+ .collect(Collectors.toList())
+ );
+ }
}
catch (Exception e) {
throw new SamplerException(e, "Failed to sample data: %s",
e.getMessage());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]