This is an automated email from the ASF dual-hosted git repository.

egorryashin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f46cc4f  Revert "fixed input source sampler buildReader exp"
f46cc4f is described below

commit f46cc4faaf9985cc1ae0d32acb945b93627d8f98
Author: egor-ryashin <[email protected]>
AuthorDate: Mon Dec 7 18:34:59 2020 +0300

    Revert "fixed input source sampler buildReader exp"
    
    This reverts commit e688db8
---
 .../overlord/sampler/InputSourceSampler.java       | 156 ++++++++++-----------
 1 file changed, 72 insertions(+), 84 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index c9984c4..1b25279 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -103,102 +103,90 @@ public class InputSourceSampler
     final File tempDir = FileUtils.createTempDir();
     closer.register(() -> FileUtils.deleteDirectory(tempDir));
 
-    try {
-      final InputSourceReader reader = buildReader(
-          nonNullSamplerConfig,
-          nonNullDataSchema,
-          inputSource,
-          inputFormat,
-          tempDir
-      );
-      try (final CloseableIterator<InputRowListPlusRawValues> iterator = 
reader.sample();
-           final IncrementalIndex<Aggregator> index = 
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
-           final Closer closer1 = closer) {
-        List<SamplerResponseRow> responseRows = new 
ArrayList<>(nonNullSamplerConfig.getNumRows());
-        int numRowsIndexed = 0;
-
-        while (responseRows.size() < nonNullSamplerConfig.getNumRows() && 
iterator.hasNext()) {
-          final InputRowListPlusRawValues inputRowListPlusRawValues = 
iterator.next();
-
-          final List<Map<String, Object>> rawColumnsList = 
inputRowListPlusRawValues.getRawValuesList();
-
-          final ParseException parseException = 
inputRowListPlusRawValues.getParseException();
-          if (parseException != null) {
-            if (rawColumnsList != null) {
-              // add all rows to response
-              responseRows.addAll(rawColumnsList.stream()
-                                                .map(rawColumns -> new 
SamplerResponseRow(
-                                                    rawColumns,
-                                                    null,
-                                                    true,
-                                                    parseException.getMessage()
-                                                ))
-                                                .collect(Collectors.toList()));
-            } else {
-              // no data parsed, add one response row
-              responseRows.add(new SamplerResponseRow(null, null, true, 
parseException.getMessage()));
-            }
-            continue;
+    final InputSourceReader reader = buildReader(
+        nonNullSamplerConfig,
+        nonNullDataSchema,
+        inputSource,
+        inputFormat,
+        tempDir
+    );
+    try (final CloseableIterator<InputRowListPlusRawValues> iterator = 
reader.sample();
+         final IncrementalIndex<Aggregator> index = 
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
+         final Closer closer1 = closer) {
+      List<SamplerResponseRow> responseRows = new 
ArrayList<>(nonNullSamplerConfig.getNumRows());
+      int numRowsIndexed = 0;
+
+      while (responseRows.size() < nonNullSamplerConfig.getNumRows() && 
iterator.hasNext()) {
+        final InputRowListPlusRawValues inputRowListPlusRawValues = 
iterator.next();
+
+        final List<Map<String, Object>> rawColumnsList = 
inputRowListPlusRawValues.getRawValuesList();
+
+        final ParseException parseException = 
inputRowListPlusRawValues.getParseException();
+        if (parseException != null) {
+          if (rawColumnsList != null) {
+            // add all rows to response
+            responseRows.addAll(rawColumnsList.stream()
+                                              .map(rawColumns -> new 
SamplerResponseRow(rawColumns, null, true, parseException.getMessage()))
+                                              .collect(Collectors.toList()));
+          } else {
+            // no data parsed, add one response row
+            responseRows.add(new SamplerResponseRow(null, null, true, 
parseException.getMessage()));
           }
+          continue;
+        }
 
-          List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
-          if (inputRows == null) {
-            continue;
-          }
+        List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
+        if (inputRows == null) {
+          continue;
+        }
 
-          for (int i = 0; i < inputRows.size(); i++) {
-            // InputRowListPlusRawValues guarantees the size of rawColumnsList 
and inputRows are the same
-            Map<String, Object> rawColumns = rawColumnsList == null ? null : 
rawColumnsList.get(i);
-            InputRow row = inputRows.get(i);
-
-            //keep the index of the row to be added to responseRows for 
further use
-            final int rowIndex = responseRows.size();
-            IncrementalIndexAddResult addResult = index.add(new 
SamplerInputRow(row, rowIndex), true);
-            if (addResult.hasParseException()) {
-              responseRows.add(new SamplerResponseRow(
-                  rawColumns,
-                  null,
-                  true,
-                  addResult.getParseException().getMessage()
-              ));
-            } else {
-              // store the raw value; will be merged with the data from the 
IncrementalIndex later
-              responseRows.add(new SamplerResponseRow(rawColumns, null, null, 
null));
-              numRowsIndexed++;
-            }
+        for (int i = 0; i < inputRows.size(); i++) {
+          // InputRowListPlusRawValues guarantees the size of rawColumnsList 
and inputRows are the same
+          Map<String, Object> rawColumns = rawColumnsList == null ? null : 
rawColumnsList.get(i);
+          InputRow row = inputRows.get(i);
+
+          //keep the index of the row to be added to responseRows for further 
use
+          final int rowIndex = responseRows.size();
+          IncrementalIndexAddResult addResult = index.add(new 
SamplerInputRow(row, rowIndex), true);
+          if (addResult.hasParseException()) {
+            responseRows.add(new SamplerResponseRow(rawColumns, null, true, 
addResult.getParseException().getMessage()));
+          } else {
+            // store the raw value; will be merged with the data from the 
IncrementalIndex later
+            responseRows.add(new SamplerResponseRow(rawColumns, null, null, 
null));
+            numRowsIndexed++;
           }
         }
+      }
 
-        final List<String> columnNames = index.getColumnNames();
-        columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
-
-        for (Row row : index) {
-          Map<String, Object> parsed = new HashMap<>();
+      final List<String> columnNames = index.getColumnNames();
+      columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
 
-          columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
-          parsed.put(ColumnHolder.TIME_COLUMN_NAME, 
row.getTimestampFromEpoch());
+      for (Row row : index) {
+        Map<String, Object> parsed = new HashMap<>();
 
-          Number sortKey = 
row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
-          if (sortKey != null) {
-            responseRows.set(sortKey.intValue(), 
responseRows.get(sortKey.intValue()).withParsed(parsed));
-          }
-        }
+        columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
+        parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
 
-        // make sure size of responseRows meets the input
-        if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
-          responseRows = responseRows.subList(0, 
nonNullSamplerConfig.getNumRows());
+        Number sortKey = 
row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
+        if (sortKey != null) {
+          responseRows.set(sortKey.intValue(), 
responseRows.get(sortKey.intValue()).withParsed(parsed));
         }
+      }
 
-        int numRowsRead = responseRows.size();
-        return new SamplerResponse(
-            numRowsRead,
-            numRowsIndexed,
-            responseRows.stream()
-                        .filter(Objects::nonNull)
-                        .filter(x -> x.getParsed() != null || 
x.isUnparseable() != null)
-                        .collect(Collectors.toList())
-        );
+      // make sure size of responseRows meets the input
+      if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
+        responseRows = responseRows.subList(0, 
nonNullSamplerConfig.getNumRows());
       }
+
+      int numRowsRead = responseRows.size();
+      return new SamplerResponse(
+          numRowsRead,
+          numRowsIndexed,
+          responseRows.stream()
+                      .filter(Objects::nonNull)
+                      .filter(x -> x.getParsed() != null || x.isUnparseable() 
!= null)
+                      .collect(Collectors.toList())
+      );
     }
     catch (Exception e) {
       throw new SamplerException(e, "Failed to sample data: %s", 
e.getMessage());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to