This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ee8786d3c add maxBytesInMemory and maxClientResponseBytes to 
SamplerConfig (#12947)
8ee8786d3c is described below

commit 8ee8786d3ce32f39d8517b5a0dddd6211101b2a4
Author: Clint Wylie <[email protected]>
AuthorDate: Thu Aug 25 00:50:41 2022 -0700

    add maxBytesInMemory and maxClientResponseBytes to SamplerConfig (#12947)
    
    * add maxBytesInMemory and maxClientResponseBytes to SamplerConfig
---
 .../java/util/common/parsers/ObjectFlatteners.java |   2 +-
 .../util/common/parsers/ObjectFlattenersTest.java  |   4 +
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java |   8 +-
 .../indexing/kinesis/KinesisSamplerSpecTest.java   |   5 +-
 .../overlord/sampler/InputSourceSampler.java       |  45 +++++++-
 .../indexing/overlord/sampler/SamplerConfig.java   |  54 +++++++++-
 .../sampler/CsvInputSourceSamplerTest.java         |   3 +-
 .../overlord/sampler/InputSourceSamplerTest.java   | 119 +++++++++++++++++++--
 8 files changed, 220 insertions(+), 20 deletions(-)

diff --git 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
index 77ae467e85..876885344b 100644
--- 
a/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
+++ 
b/core/src/main/java/org/apache/druid/java/util/common/parsers/ObjectFlatteners.java
@@ -89,7 +89,7 @@ public class ObjectFlatteners
           @Override
           public boolean isEmpty()
           {
-            throw new UnsupportedOperationException();
+            return keySet().isEmpty();
           }
 
           @Override
diff --git 
a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
 
b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
index e48c4dafe8..2b610690db 100644
--- 
a/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
+++ 
b/core/src/test/java/org/apache/druid/java/util/common/parsers/ObjectFlattenersTest.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -45,9 +46,12 @@ public class ObjectFlattenersTest
   {
     JsonNode node = OBJECT_MAPPER.readTree(SOME_JSON);
     Map<String, Object> flat = FLATTENER.flatten(node);
+    Assert.assertEquals(ImmutableSet.of("extract", "foo", "bar"), 
flat.keySet());
+    Assert.assertFalse(flat.isEmpty());
     Assert.assertNull(flat.get("foo"));
     Assert.assertEquals(1L, flat.get("bar"));
     Assert.assertEquals(1L, flat.get("extract"));
+    Assert.assertEquals("{\"extract\":1,\"foo\":null,\"bar\":1}", 
OBJECT_MAPPER.writeValueAsString(flat));
   }
 
   @Test
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 92ab899bbf..bf5831ccae 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -160,8 +160,8 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
-        new SamplerConfig(5, null),
-        new InputSourceSampler(),
+        new SamplerConfig(5, null, null, null),
+        new InputSourceSampler(OBJECT_MAPPER),
         OBJECT_MAPPER
     );
 
@@ -335,8 +335,8 @@ public class KafkaSamplerSpecTest extends 
InitializedNullHandlingTest
 
     KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
         supervisorSpec,
-        new SamplerConfig(5, null),
-        new InputSourceSampler(),
+        new SamplerConfig(5, null, null, null),
+        new InputSourceSampler(OBJECT_MAPPER),
         OBJECT_MAPPER
     );
 
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
index 918b25d490..9a446f5265 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -40,6 +40,7 @@ import 
org.apache.druid.indexing.overlord.sampler.SamplerConfig;
 import org.apache.druid.indexing.overlord.sampler.SamplerTestUtils;
 import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.parsers.JSONPathSpec;
@@ -170,8 +171,8 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
 
     KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec(
         supervisorSpec,
-        new SamplerConfig(5, null),
-        new InputSourceSampler(),
+        new SamplerConfig(5, null, null, null),
+        new InputSourceSampler(new DefaultObjectMapper()),
         null
     );
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
index 21685c4d74..c91d8434c2 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/InputSourceSampler.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.indexing.overlord.sampler;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.druid.client.indexing.SamplerResponse;
@@ -33,6 +34,7 @@ import org.apache.druid.data.input.Row;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.TimedShutoffInputSourceReader;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.guice.annotations.Json;
 import org.apache.druid.indexing.input.InputRowSchemas;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.FileUtils;
@@ -49,6 +51,7 @@ import 
org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.segment.indexing.DataSchema;
 
 import javax.annotation.Nullable;
+import javax.inject.Inject;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
@@ -81,6 +84,14 @@ public class InputSourceSampler
       SamplerInputRow.SAMPLER_ORDERING_COLUMN
   );
 
+  private final ObjectMapper jsonMapper;
+
+  @Inject
+  public InputSourceSampler(@Json ObjectMapper jsonMapper)
+  {
+    this.jsonMapper = jsonMapper;
+  }
+
   public SamplerResponse sample(
       final InputSource inputSource,
       // inputFormat can be null only if inputSource.needsFormat() = false or 
parser is specified.
@@ -118,7 +129,11 @@ public class InputSourceSampler
         List<SamplerResponseRow> responseRows = new 
ArrayList<>(nonNullSamplerConfig.getNumRows());
         int numRowsIndexed = 0;
 
-        while (responseRows.size() < nonNullSamplerConfig.getNumRows() && 
iterator.hasNext()) {
+        while (
+            responseRows.size() < nonNullSamplerConfig.getNumRows() &&
+            index.getBytesInMemory().get() < 
nonNullSamplerConfig.getMaxBytesInMemory() &&
+            iterator.hasNext()
+        ) {
           final InputRowListPlusRawValues inputRowListPlusRawValues = 
iterator.next();
 
           final List<Map<String, Object>> rawColumnsList = 
inputRowListPlusRawValues.getRawValuesList();
@@ -173,6 +188,7 @@ public class InputSourceSampler
         final List<String> columnNames = index.getColumnNames();
         columnNames.remove(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
 
+
         for (Row row : index) {
           Map<String, Object> parsed = new LinkedHashMap<>();
 
@@ -181,7 +197,9 @@ public class InputSourceSampler
 
           Number sortKey = 
row.getMetric(SamplerInputRow.SAMPLER_ORDERING_COLUMN);
           if (sortKey != null) {
-            responseRows.set(sortKey.intValue(), 
responseRows.get(sortKey.intValue()).withParsed(parsed));
+            SamplerResponseRow theRow = 
responseRows.get(sortKey.intValue()).withParsed(parsed);
+            responseRows.set(sortKey.intValue(), theRow);
+
           }
         }
 
@@ -190,7 +208,30 @@ public class InputSourceSampler
           responseRows = responseRows.subList(0, 
nonNullSamplerConfig.getNumRows());
         }
 
+        if (nonNullSamplerConfig.getMaxClientResponseBytes() > 0) {
+          long estimatedResponseSize = 0;
+          boolean limited = false;
+          int rowCounter = 0;
+          int parsedCounter = 0;
+          for (SamplerResponseRow row : responseRows) {
+            rowCounter++;
+            if (row.getInput() != null) {
+              parsedCounter++;
+            }
+            estimatedResponseSize += jsonMapper.writeValueAsBytes(row).length;
+            if (estimatedResponseSize > 
nonNullSamplerConfig.getMaxClientResponseBytes()) {
+              limited = true;
+              break;
+            }
+          }
+          if (limited) {
+            responseRows = responseRows.subList(0, rowCounter);
+            numRowsIndexed = parsedCounter;
+          }
+        }
+
         int numRowsRead = responseRows.size();
+
         return new SamplerResponse(
             numRowsRead,
             numRowsIndexed,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
index 6799663cdd..aaa2cd0789 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/sampler/SamplerConfig.java
@@ -22,6 +22,12 @@ package org.apache.druid.indexing.overlord.sampler;
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.java.util.common.HumanReadableBytes;
+import org.apache.druid.segment.indexing.DataSchema;
+
+import javax.annotation.Nullable;
 
 public class SamplerConfig
 {
@@ -29,17 +35,27 @@ public class SamplerConfig
   private static final int MAX_NUM_ROWS = 5000;
   private static final int DEFAULT_TIMEOUT_MS = 10000;
 
+
+
   private final int numRows;
   private final int timeoutMs;
 
+  private final long maxBytesInMemory;
+
+  private final long maxClientResponseBytes;
+
   @JsonCreator
   public SamplerConfig(
-      @JsonProperty("numRows") Integer numRows,
-      @JsonProperty("timeoutMs") Integer timeoutMs
+      @JsonProperty("numRows") @Nullable Integer numRows,
+      @JsonProperty("timeoutMs") @Nullable Integer timeoutMs,
+      @JsonProperty("maxBytesInMemory") @Nullable HumanReadableBytes 
maxBytesInMemory,
+      @JsonProperty("maxClientResponseBytes") @Nullable HumanReadableBytes 
maxClientResponseBytes
   )
   {
     this.numRows = numRows != null ? numRows : DEFAULT_NUM_ROWS;
     this.timeoutMs = timeoutMs != null ? timeoutMs : DEFAULT_TIMEOUT_MS;
+    this.maxBytesInMemory = maxBytesInMemory != null ? 
maxBytesInMemory.getBytes() : Long.MAX_VALUE;
+    this.maxClientResponseBytes = maxClientResponseBytes != null ? 
maxClientResponseBytes.getBytes() : 0;
 
     Preconditions.checkArgument(this.numRows <= MAX_NUM_ROWS, "numRows must be 
<= %s", MAX_NUM_ROWS);
   }
@@ -47,9 +63,13 @@ public class SamplerConfig
   /**
    * The maximum number of rows to return in a response. The actual number of 
returned rows may be less if:
    *   - The sampled source contains less data.
-   *   - {@link SamplerConfig#timeoutMs} elapses before this value is reached.
+   *   - {@link SamplerConfig#timeoutMs} elapses before this value is reached
    *   - {@link 
org.apache.druid.segment.indexing.granularity.GranularitySpec#isRollup()} is 
true and input rows get
    *     rolled-up into fewer indexed rows.
+   *   - The incremental index performing the sampling reaches {@link 
SamplerConfig#getMaxBytesInMemory()} before this
+   *     value is reached
+   *   - The estimated size of the {@link 
org.apache.druid.client.indexing.SamplerResponse} crosses
+   *     {@link SamplerConfig#getMaxClientResponseBytes()}
    *
    * @return maximum number of sampled rows to return
    */
@@ -70,8 +90,34 @@ public class SamplerConfig
     return timeoutMs;
   }
 
+  /**
+   * Maximum number of bytes in memory that the {@link 
org.apache.druid.segment.incremental.IncrementalIndex} used by
+   * {@link InputSourceSampler#sample(InputSource, InputFormat, DataSchema, 
SamplerConfig)} will be allowed to
+   * accumulate before aborting sampling. Particularly useful for limiting 
footprint of sample operations as well as
+   * overall response size from sample requests. However, it is not directly 
correlated to response size since it
+   * also contains the "raw" input data, so actual responses will likely be at 
least twice the size of this value,
+   * depending on factors such as number of transforms, aggregations in the 
case of rollup, whether all columns
+   * of the input are present in the dimension spec, and so on. If it is 
preferred to control client response size,
+   * use {@link SamplerConfig#getMaxClientResponseBytes()} instead.
+   */
+  public long getMaxBytesInMemory()
+  {
+    return maxBytesInMemory;
+  }
+
+  /**
+   * Maximum number of bytes to accumulate for a {@link 
org.apache.druid.client.indexing.SamplerResponse} before
+   * shutting off sampling. To directly control the size of the
+   * {@link org.apache.druid.segment.incremental.IncrementalIndex} used for 
sampling, use
+   * {@link SamplerConfig#getMaxBytesInMemory()} instead.
+   */
+  public long getMaxClientResponseBytes()
+  {
+    return maxClientResponseBytes;
+  }
+
   public static SamplerConfig empty()
   {
-    return new SamplerConfig(null, null);
+    return new SamplerConfig(null, null, null, null);
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java
index 1100cb82a1..464579764d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/CsvInputSourceSamplerTest.java
@@ -28,6 +28,7 @@ import org.apache.druid.data.input.impl.CsvInputFormat;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.InlineInputSource;
 import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.junit.Assert;
@@ -60,7 +61,7 @@ public class CsvInputSourceSamplerTest
     );
     final InputSource inputSource = new InlineInputSource(String.join("\n", 
strCsvRows));
     final InputFormat inputFormat = new CsvInputFormat(null, null, null, true, 
0);
-    final InputSourceSampler inputSourceSampler = new InputSourceSampler();
+    final InputSourceSampler inputSourceSampler = new InputSourceSampler(new 
DefaultObjectMapper());
 
     final SamplerResponse response = inputSourceSampler.sample(
         inputSource,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
index 8244f3dcc4..50e2f63c0f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/sampler/InputSourceSamplerTest.java
@@ -49,6 +49,7 @@ import 
org.apache.druid.indexing.seekablestream.common.RecordSupplier;
 import org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.HumanReadableBytes;
 import org.apache.druid.java.util.common.IAE;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.collect.Utils;
@@ -126,6 +127,7 @@ public class InputSourceSamplerTest extends 
InitializedNullHandlingTest
   @Parameterized.Parameters(name = "parserType = {0}, useInputFormatApi={1}")
   public static Iterable<Object[]> constructorFeeder()
   {
+    OBJECT_MAPPER.registerModules(new SamplerModule().getJacksonModules());
     return ImmutableList.of(
         new Object[]{ParserType.STR_JSON, false},
         new Object[]{ParserType.STR_JSON, true},
@@ -143,7 +145,7 @@ public class InputSourceSamplerTest extends 
InitializedNullHandlingTest
   @Before
   public void setupTest()
   {
-    inputSourceSampler = new InputSourceSampler();
+    inputSourceSampler = new InputSourceSampler(OBJECT_MAPPER);
 
     mapOfRows = new ArrayList<>();
     final List<String> columns = ImmutableList.of("t", "dim1", "dim2", "met1");
@@ -246,7 +248,7 @@ public class InputSourceSamplerTest extends 
InitializedNullHandlingTest
         inputSource,
         createInputFormat(),
         null,
-        new SamplerConfig(3, null)
+        new SamplerConfig(3, null, null, null)
     );
 
     Assert.assertEquals(3, response.getNumRowsRead());
@@ -1227,10 +1229,12 @@ public class InputSourceSamplerTest extends 
InitializedNullHandlingTest
         STR_JSON_ROWS.stream().limit(STR_JSON_ROWS.size() - 
1).collect(Collectors.joining())
     );
 
-    SamplerResponse response = inputSourceSampler.sample(new 
RecordSupplierInputSource("topicName", new TestRecordSupplier(jsonBlockList), 
true),
-                                                         createInputFormat(),
-                                                         dataSchema,
-                                                         new 
SamplerConfig(200, 3000/*default timeout is 10s, shorten it to speed up*/));
+    SamplerResponse response = inputSourceSampler.sample(
+        new RecordSupplierInputSource("topicName", new 
TestRecordSupplier(jsonBlockList), true),
+        createInputFormat(),
+        dataSchema,
+        new SamplerConfig(200, 3000/*default timeout is 10s, shorten it to 
speed up*/, null, null)
+    );
 
     //
     // the 1st json block contains STR_JSON_ROWS.size() lines, and 2nd json 
block contains STR_JSON_ROWS.size()-1 lines
@@ -1328,6 +1332,109 @@ public class InputSourceSamplerTest extends 
InitializedNullHandlingTest
     inputSourceSampler.sample(failingReaderInputSource, null, null, null);
   }
 
+  @Test
+  public void testRowLimiting() throws IOException
+  {
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(null);
+    final AggregatorFactory[] aggregatorFactories = {new 
LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        null
+    );
+    final InputSource inputSource = createInputSource(getTestRows(), 
dataSchema);
+    final InputFormat inputFormat = createInputFormat();
+
+    SamplerResponse response = inputSourceSampler.sample(
+        inputSource,
+        inputFormat,
+        dataSchema,
+        new SamplerConfig(4, null, null, null)
+    );
+
+    Assert.assertEquals(4, response.getNumRowsRead());
+    Assert.assertEquals(4, response.getNumRowsIndexed());
+    Assert.assertEquals(2, response.getData().size());
+
+  }
+
+  @Test
+  public void testMaxBytesInMemoryLimiting() throws IOException
+  {
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(null);
+    final AggregatorFactory[] aggregatorFactories = {new 
LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        null
+    );
+    final InputSource inputSource = createInputSource(getTestRows(), 
dataSchema);
+    final InputFormat inputFormat = createInputFormat();
+
+    SamplerResponse response = inputSourceSampler.sample(
+        inputSource,
+        inputFormat,
+        dataSchema,
+        new SamplerConfig(null, null, HumanReadableBytes.valueOf(256), null)
+    );
+
+    Assert.assertEquals(4, response.getNumRowsRead());
+    Assert.assertEquals(4, response.getNumRowsIndexed());
+    Assert.assertEquals(2, response.getData().size());
+  }
+
+  @Test
+  public void testMaxClientResponseBytesLimiting() throws IOException
+  {
+    final TimestampSpec timestampSpec = new TimestampSpec("t", null, null);
+    final DimensionsSpec dimensionsSpec = new DimensionsSpec(null);
+    final AggregatorFactory[] aggregatorFactories = {new 
LongSumAggregatorFactory("met1", "met1")};
+    final GranularitySpec granularitySpec = new UniformGranularitySpec(
+        Granularities.DAY,
+        Granularities.HOUR,
+        true,
+        null
+    );
+    final DataSchema dataSchema = createDataSchema(
+        timestampSpec,
+        dimensionsSpec,
+        aggregatorFactories,
+        granularitySpec,
+        null
+    );
+    final InputSource inputSource = createInputSource(getTestRows(), 
dataSchema);
+    final InputFormat inputFormat = createInputFormat();
+
+    SamplerResponse response = inputSourceSampler.sample(
+        inputSource,
+        inputFormat,
+        dataSchema,
+        new SamplerConfig(null, null, null, HumanReadableBytes.valueOf(300))
+    );
+
+    Assert.assertEquals(4, response.getNumRowsRead());
+    Assert.assertEquals(4, response.getNumRowsIndexed());
+    Assert.assertEquals(2, response.getData().size());
+  }
+
   private List<String> getTestRows()
   {
     switch (parserType) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to