This is an automated email from the ASF dual-hosted git repository.

suneet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d8a69e  fixed input source sampler buildReader exp (#10651)
5d8a69e is described below

commit 5d8a69ecf9fa1bbe212bc1ee433092be0425f3b6
Author: Egor Riashin <[email protected]>
AuthorDate: Sat Feb 6 00:33:55 2021 +0300

    fixed input source sampler buildReader exp (#10651)
    
    * fixed input source sampler buildReader exp
    
    * sampler exception unit test
    
    * styling
    
    Co-authored-by: egor-ryashin <[email protected]>
---
 .../overlord/sampler/InputSourceSampler.java       | 156 +++++++++++----------
 .../overlord/sampler/InputSourceSamplerTest.java   |  33 +++++
 2 files changed, 117 insertions(+), 72 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 2831734..4342460 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -105,90 +105,102 @@ public class InputSourceSampler
     final File tempDir = FileUtils.createTempDir();
     closer.register(() -> FileUtils.deleteDirectory(tempDir));
 
-    final InputSourceReader reader = buildReader(
-        nonNullSamplerConfig,
-        nonNullDataSchema,
-        inputSource,
-        inputFormat,
-        tempDir
-    );
-    try (final CloseableIterator<InputRowListPlusRawValues> iterator = 
reader.sample();
-         final IncrementalIndex<Aggregator> index = 
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
-         final Closer closer1 = closer) {
-      List<SamplerResponseRow> responseRows = new 
ArrayList<>(nonNullSamplerConfig.getNumRows());
-      int numRowsIndexed = 0;
-
-      while (responseRows.size() < nonNullSamplerConfig.getNumRows() && 
iterator.hasNext()) {
-        final InputRowListPlusRawValues inputRowListPlusRawValues = 
iterator.next();
-
-        final List<Map<String, Object>> rawColumnsList = 
inputRowListPlusRawValues.getRawValuesList();
-
-        final ParseException parseException = 
inputRowListPlusRawValues.getParseException();
-        if (parseException != null) {
-          if (rawColumnsList != null) {
-            // add all rows to response
-            responseRows.addAll(rawColumnsList.stream()
-                                              .map(rawColumns -> new 
SamplerResponseRow(rawColumns, null, true, parseException.getMessage()))
-                                              .collect(Collectors.toList()));
-          } else {
-            // no data parsed, add one response row
-            responseRows.add(new SamplerResponseRow(null, null, true, 
parseException.getMessage()));
+    try {
+      final InputSourceReader reader = buildReader(
+          nonNullSamplerConfig,
+          nonNullDataSchema,
+          inputSource,
+          inputFormat,
+          tempDir
+      );
+      try (final CloseableIterator<InputRowListPlusRawValues> iterator = 
reader.sample();
+           final IncrementalIndex<Aggregator> index = 
buildIncrementalIndex(nonNullSamplerConfig, nonNullDataSchema);
+           final Closer closer1 = closer) {
+        List<SamplerResponseRow> responseRows = new 
ArrayList<>(nonNullSamplerConfig.getNumRows());
+        int numRowsIndexed = 0;
+
+        while (responseRows.size() < nonNullSamplerConfig.getNumRows() && 
iterator.hasNext()) {
+          final InputRowListPlusRawValues inputRowListPlusRawValues = 
iterator.next();
+
+          final List<Map<String, Object>> rawColumnsList = 
inputRowListPlusRawValues.getRawValuesList();
+
+          final ParseException parseException = 
inputRowListPlusRawValues.getParseException();
+          if (parseException != null) {
+            if (rawColumnsList != null) {
+              // add all rows to response
+              responseRows.addAll(rawColumnsList.stream()
+                                                .map(rawColumns -> new 
SamplerResponseRow(
+                                                    rawColumns,
+                                                    null,
+                                                    true,
+                                                    parseException.getMessage()
+                                                ))
+                                                .collect(Collectors.toList()));
+            } else {
+              // no data parsed, add one response row
+              responseRows.add(new SamplerResponseRow(null, null, true, 
parseException.getMessage()));
+            }
+            continue;
           }
-          continue;
-        }
 
-        List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
-        if (inputRows == null) {
-          continue;
-        }
+          List<InputRow> inputRows = inputRowListPlusRawValues.getInputRows();
+          if (inputRows == null) {
+            continue;
+          }
 
-        for (int i = 0; i < inputRows.size(); i++) {
-          // InputRowListPlusRawValues guarantees the size of rawColumnsList 
and inputRows are the same
-          Map<String, Object> rawColumns = rawColumnsList == null ? null : 
rawColumnsList.get(i);
-          InputRow row = inputRows.get(i);
-
-          //keep the index of the row to be added to responseRows for further 
use
-          final int rowIndex = responseRows.size();
-          IncrementalIndexAddResult addResult = index.add(new 
SamplerInputRow(row, rowIndex), true);
-          if (addResult.hasParseException()) {
-            responseRows.add(new SamplerResponseRow(rawColumns, null, true, 
addResult.getParseException().getMessage()));
-          } else {
-            // store the raw value; will be merged with the data from the 
IncrementalIndex later
-            responseRows.add(new SamplerResponseRow(rawColumns, null, null, 
null));
-            numRowsIndexed++;
+          for (int i = 0; i < inputRows.size(); i++) {
+            // InputRowListPlusRawValues guarantees the size of rawColumnsList 
and inputRows are the same
+            Map<String, Object> rawColumns = rawColumnsList == null ? null : 
rawColumnsList.get(i);
+            InputRow row = inputRows.get(i);
+
+            //keep the index of the row to be added to responseRows for 
further use
+            final int rowIndex = responseRows.size();
+            IncrementalIndexAddResult addResult = index.add(new 
SamplerInputRow(row, rowIndex), true);
+            if (addResult.hasParseException()) {
+              responseRows.add(new SamplerResponseRow(
+                  rawColumns,
+                  null,
+                  true,
+                  addResult.getParseException().getMessage()
+              ));
+            } else {
+              // store the raw value; will be merged with the data from the 
IncrementalIndex later
+              responseRows.add(new SamplerResponseRow(rawColumns, null, null, 
null));
+              numRowsIndexed++;
+            }
           }
         }
-      }
 
-      final List<String> columnNames = index.getColumnNames();
-      columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
+        final List<String> columnNames = index.getColumnNames();
+        columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
 
-      for (Row row : index) {
-        Map<String, Object> parsed = new HashMap<>();
+        for (Row row : index) {
+          Map<String, Object> parsed = new HashMap<>();
 
-        columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
-        parsed.put(ColumnHolder.TIME_COLUMN_NAME, row.getTimestampFromEpoch());
+          columnNames.forEach(k -> parsed.put(k, row.getRaw(k)));
+          parsed.put(ColumnHolder.TIME_COLUMN_NAME, 
row.getTimestampFromEpoch());
 
-        Number sortKey = 
row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
-        if (sortKey != null) {
-          responseRows.set(sortKey.intValue(), 
responseRows.get(sortKey.intValue()).withParsed(parsed));
+          Number sortKey = 
row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
+          if (sortKey != null) {
+            responseRows.set(sortKey.intValue(), 
responseRows.get(sortKey.intValue()).withParsed(parsed));
+          }
         }
-      }
 
-      // make sure size of responseRows meets the input
-      if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
-        responseRows = responseRows.subList(0, 
nonNullSamplerConfig.getNumRows());
-      }
+        // make sure size of responseRows meets the input
+        if (responseRows.size() > nonNullSamplerConfig.getNumRows()) {
+          responseRows = responseRows.subList(0, 
nonNullSamplerConfig.getNumRows());
+        }
 
-      int numRowsRead = responseRows.size();
-      return new SamplerResponse(
-          numRowsRead,
-          numRowsIndexed,
-          responseRows.stream()
-                      .filter(Objects::nonNull)
-                      .filter(x -> x.getParsed() != null || x.isUnparseable() 
!= null)
-                      .collect(Collectors.toList())
-      );
+        int numRowsRead = responseRows.size();
+        return new SamplerResponse(
+            numRowsRead,
+            numRowsIndexed,
+            responseRows.stream()
+                        .filter(Objects::nonNull)
+                        .filter(x -> x.getParsed() != null || 
x.isUnparseable() != null)
+                        .collect(Collectors.toList())
+        );
+      }
     }
     catch (Exception e) {
       throw new SamplerException(e, "Failed to sample data: %s", 
e.getMessage());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index 827310e..b8725b1 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -28,7 +28,9 @@ import org.apache.druid.client.indexing.SamplerResponse;
 import org.apache.druid.client.indexing.SamplerResponse.SamplerResponseRow;
 import org.apache.druid.data.input.FirehoseFactoryToInputSourceAdaptor;
 import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
 import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
 import org.apache.druid.data.input.impl.ByteEntity;
 import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DelimitedParseSpec;
@@ -71,6 +73,7 @@ import org.junit.runners.Parameterized;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -1289,6 +1292,36 @@ public class InputSourceSamplerTest extends 
InitializedNullHandlingTest
     );
   }
 
+  @Test(expected = SamplerException.class)
+  public void testReaderCreationException()
+  {
+    InputSource failingReaderInputSource = new InputSource()
+    {
+      @Override
+      public boolean isSplittable()
+      {
+        return false;
+      }
+
+      @Override
+      public boolean needsFormat()
+      {
+        return false;
+      }
+
+      @Override
+      public InputSourceReader reader(
+          InputRowSchema inputRowSchema,
+          @Nullable InputFormat inputFormat,
+          File temporaryDirectory
+      )
+      {
+        throw new RuntimeException();
+      }
+    };
+    inputSourceSampler.sample(failingReaderInputSource, null, null, null);
+  }
+
   private List<String> getTestRows()
   {
     switch (parserType) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to