This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d384579  Data loader (sampler component) - Kafka/Kinesis samplers 
(#7566)
d384579 is described below

commit d38457933f1b78a3766198b43ce0b61ecd221179
Author: David Lim <[email protected]>
AuthorDate: Thu May 16 21:26:23 2019 -0600

    Data loader (sampler component) - Kafka/Kinesis samplers (#7566)
    
    * implement Kafka/Kinesis sampler
    
    * add KafkaSamplerSpecTest and KinesisSamplerSpecTest
    
    * code review changes
---
 .../druid/indexing/kafka/KafkaIndexTaskModule.java |   3 +-
 .../druid/indexing/kafka/KafkaSamplerSpec.java     |  91 ++++++
 .../druid/indexing/kafka/KafkaSamplerSpecTest.java | 270 ++++++++++++++++++
 .../kinesis/KinesisIndexingServiceModule.java      |   3 +-
 .../druid/indexing/kinesis/KinesisSamplerSpec.java |  92 +++++++
 .../indexing/kinesis/KinesisSamplerSpecTest.java   | 305 +++++++++++++++++++++
 .../seekablestream/SeekableStreamSamplerSpec.java  | 207 ++++++++++++++
 7 files changed, 969 insertions(+), 2 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
index 34bc3bc..cf780ad 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
@@ -48,7 +48,8 @@ public class KafkaIndexTaskModule implements DruidModule
                 // (Older versions of Druid didn't specify a type name and got 
this one by default.)
                 new NamedType(KafkaIndexTaskTuningConfig.class, 
"KafkaTuningConfig"),
                 new NamedType(KafkaSupervisorTuningConfig.class, "kafka"),
-                new NamedType(KafkaSupervisorSpec.class, "kafka")
+                new NamedType(KafkaSupervisorSpec.class, "kafka"),
+                new NamedType(KafkaSamplerSpec.class, "kafka")
             )
     );
   }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
new file mode 100644
index 0000000..63f519b
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaSamplerSpec extends SeekableStreamSamplerSpec
+{
+  private final ObjectMapper objectMapper;
+
+  @JsonCreator
+  public KafkaSamplerSpec(
+      @JsonProperty("spec") final KafkaSupervisorSpec ingestionSpec,
+      @JsonProperty("samplerConfig") final SamplerConfig samplerConfig,
+      @JacksonInject FirehoseSampler firehoseSampler,
+      @JacksonInject ObjectMapper objectMapper
+  )
+  {
+    super(ingestionSpec, samplerConfig, firehoseSampler);
+
+    this.objectMapper = objectMapper;
+  }
+
+  @Override
+  protected Firehose getFirehose(InputRowParser parser)
+  {
+    return new KafkaSamplerFirehose(parser);
+  }
+
+  protected class KafkaSamplerFirehose extends SeekableStreamSamplerFirehose
+  {
+    private KafkaSamplerFirehose(InputRowParser parser)
+    {
+      super(parser);
+    }
+
+    @Override
+    protected RecordSupplier getRecordSupplier()
+    {
+      ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+      try {
+        
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+        final Map<String, Object> props = new 
HashMap<>(((KafkaSupervisorIOConfig) ioConfig).getConsumerProperties());
+
+        props.put("enable.auto.commit", "false");
+        props.put("auto.offset.reset", "none");
+        props.put("key.deserializer", ByteArrayDeserializer.class.getName());
+        props.put("value.deserializer", ByteArrayDeserializer.class.getName());
+        props.put("request.timeout.ms", 
Integer.toString(samplerConfig.getTimeoutMs()));
+
+        return new KafkaRecordSupplier(props, objectMapper);
+      }
+      finally {
+        Thread.currentThread().setContextClassLoader(currCtxCl);
+      }
+    }
+  }
+}
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
new file mode 100644
index 0000000..5820047
--- /dev/null
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.test.TestingCluster;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerCache;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaSamplerSpecTest
+{
+  private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+  private static final String TOPIC = "sampling";
+  private static final DataSchema DATA_SCHEMA = new DataSchema(
+      "test_ds",
+      objectMapper.convertValue(
+          new StringInputRowParser(
+              new JSONParseSpec(
+                  new TimestampSpec("timestamp", "iso", null),
+                  new DimensionsSpec(
+                      Arrays.asList(
+                          new StringDimensionSchema("dim1"),
+                          new StringDimensionSchema("dim1t"),
+                          new StringDimensionSchema("dim2"),
+                          new LongDimensionSchema("dimLong"),
+                          new FloatDimensionSchema("dimFloat")
+                      ),
+                      null,
+                      null
+                  ),
+                  new JSONPathSpec(true, ImmutableList.of()),
+                  ImmutableMap.of()
+              ),
+              StandardCharsets.UTF_8.name()
+          ),
+          Map.class
+      ),
+      new AggregatorFactory[]{
+          new DoubleSumAggregatorFactory("met1sum", "met1"),
+          new CountAggregatorFactory("rows")
+      },
+      new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+      null,
+      objectMapper
+  );
+
+  private static TestingCluster zkServer;
+  private static TestBroker kafkaServer;
+
+  private static List<ProducerRecord<byte[], byte[]>> generateRecords(String 
topic)
+  {
+    return ImmutableList.of(
+        new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", 
"20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, 
jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+        new ProducerRecord<>(topic, 0, null, 
StringUtils.toUtf8("unparseable")),
+        new ProducerRecord<>(topic, 0, null, null)
+    );
+  }
+
+  @BeforeClass
+  public static void setupClass() throws Exception
+  {
+    zkServer = new TestingCluster(1);
+    zkServer.start();
+
+    kafkaServer = new TestBroker(zkServer.getConnectString(), null, 1, 
ImmutableMap.of("num.partitions", "2"));
+    kafkaServer.start();
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception
+  {
+    kafkaServer.close();
+    zkServer.stop();
+  }
+
+  @Test(timeout = 30_000L)
+  public void testSample()
+  {
+    insertData(generateRecords(TOPIC));
+
+    KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
+        DATA_SCHEMA,
+        null,
+        new KafkaSupervisorIOConfig(
+            TOPIC,
+            null,
+            null,
+            null,
+            kafkaServer.consumerProperties(),
+            null,
+            null,
+            null,
+            true,
+            null,
+            null,
+            null
+        ),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
+        supervisorSpec,
+        new SamplerConfig(5, null, null, null),
+        new FirehoseSampler(objectMapper, new 
SamplerCache(MapCache.create(100000))),
+        objectMapper
+    );
+
+    SamplerResponse response = samplerSpec.sample();
+
+    Assert.assertNotNull(response.getCacheKey());
+    Assert.assertEquals(5, (int) response.getNumRowsRead());
+    Assert.assertEquals(3, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(5, response.getData().size());
+
+    Iterator<SamplerResponse.SamplerResponseRow> it = 
response.getData().iterator();
+
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        
"{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+        ImmutableMap.<String, Object>builder()
+            .put("__time", 1199145600000L)
+            .put("dim1", "a")
+            .put("dim2", "y")
+            .put("dimLong", 10L)
+            .put("dimFloat", 20.0F)
+            .put("rows", 1L)
+            .put("met1sum", 1.0)
+            .build(),
+        null,
+        null
+    ), it.next());
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        
"{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+        ImmutableMap.<String, Object>builder()
+            .put("__time", 1230768000000L)
+            .put("dim1", "b")
+            .put("dim2", "y")
+            .put("dimLong", 10L)
+            .put("dimFloat", 20.0F)
+            .put("rows", 1L)
+            .put("met1sum", 1.0)
+            .build(),
+        null,
+        null
+    ), it.next());
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        
"{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+        ImmutableMap.<String, Object>builder()
+            .put("__time", 1262304000000L)
+            .put("dim1", "c")
+            .put("dim2", "y")
+            .put("dimLong", 10L)
+            .put("dimFloat", 20.0F)
+            .put("rows", 1L)
+            .put("met1sum", 1.0)
+            .build(),
+        null,
+        null
+    ), it.next());
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        
"{\"timestamp\":\"246140482-04-24T15:36:27.903Z\",\"dim1\":\"x\",\"dim2\":\"z\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+        null,
+        true,
+        "Timestamp cannot be represented as a long: 
[MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, 
event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, 
dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+    ), it.next());
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        "unparseable",
+        null,
+        true,
+        "Unable to parse row [unparseable]"
+    ), it.next());
+
+    Assert.assertFalse(it.hasNext());
+  }
+
+  private static void insertData(List<ProducerRecord<byte[], byte[]>> data)
+  {
+    try (final KafkaProducer<byte[], byte[]> kafkaProducer = 
kafkaServer.newProducer()) {
+      kafkaProducer.initTransactions();
+      kafkaProducer.beginTransaction();
+
+      data.forEach(kafkaProducer::send);
+
+      kafkaProducer.commitTransaction();
+    }
+  }
+
+  private static byte[] jb(String timestamp, String dim1, String dim2, String 
dimLong, String dimFloat, String met1)
+  {
+    try {
+      return new ObjectMapper().writeValueAsBytes(
+          ImmutableMap.builder()
+                      .put("timestamp", timestamp)
+                      .put("dim1", dim1)
+                      .put("dim2", dim2)
+                      .put("dimLong", dimLong)
+                      .put("dimFloat", dimFloat)
+                      .put("met1", met1)
+                      .build()
+      );
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java
index cba5166..3cea45a 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java
@@ -48,7 +48,8 @@ public class KinesisIndexingServiceModule implements 
DruidModule
                 new NamedType(KinesisDataSourceMetadata.class, "kinesis"),
                 new NamedType(KinesisIndexTaskIOConfig.class, "kinesis"),
                 new NamedType(KinesisSupervisorTuningConfig.class, "kinesis"),
-                new NamedType(KinesisSupervisorSpec.class, "kinesis")
+                new NamedType(KinesisSupervisorSpec.class, "kinesis"),
+                new NamedType(KinesisSamplerSpec.class, "kinesis")
             )
     );
   }
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
new file mode 100644
index 0000000..1fd36f2
--- /dev/null
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kinesis;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.inject.name.Named;
+import org.apache.druid.common.aws.AWSCredentialsConfig;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec;
+import 
org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+
+public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
+{
+  private final AWSCredentialsConfig awsCredentialsConfig;
+
+  @JsonCreator
+  public KinesisSamplerSpec(
+      @JsonProperty("spec") final KinesisSupervisorSpec ingestionSpec,
+      @JsonProperty("samplerConfig") final SamplerConfig samplerConfig,
+      @JacksonInject FirehoseSampler firehoseSampler,
+      @JacksonInject @Named("kinesis") AWSCredentialsConfig 
awsCredentialsConfig
+  )
+  {
+    super(ingestionSpec, samplerConfig, firehoseSampler);
+
+    this.awsCredentialsConfig = awsCredentialsConfig;
+  }
+
+  @Override
+  protected Firehose getFirehose(InputRowParser parser)
+  {
+    return new KinesisSamplerFirehose(parser);
+  }
+
+  protected class KinesisSamplerFirehose extends SeekableStreamSamplerFirehose
+  {
+    protected KinesisSamplerFirehose(InputRowParser parser)
+    {
+      super(parser);
+    }
+
+    @Override
+    protected RecordSupplier getRecordSupplier()
+    {
+      KinesisSupervisorIOConfig ioConfig = (KinesisSupervisorIOConfig) 
KinesisSamplerSpec.this.ioConfig;
+      KinesisSupervisorTuningConfig tuningConfig = 
((KinesisSupervisorTuningConfig) KinesisSamplerSpec.this.tuningConfig);
+
+      return new KinesisRecordSupplier(
+          KinesisRecordSupplier.getAmazonKinesisClient(
+              ioConfig.getEndpoint(),
+              awsCredentialsConfig,
+              ioConfig.getAwsAssumedRoleArn(),
+              ioConfig.getAwsExternalId()
+          ),
+          ioConfig.getRecordsPerFetch(),
+          ioConfig.getFetchDelayMillis(),
+          1,
+          ioConfig.isDeaggregate(),
+          tuningConfig.getRecordBufferSize(),
+          tuningConfig.getRecordBufferOfferTimeout(),
+          tuningConfig.getRecordBufferFullWait(),
+          tuningConfig.getFetchSequenceNumberTimeout(),
+          tuningConfig.getMaxRecordsPerPoll()
+      );
+    }
+  }
+}
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
new file mode 100644
index 0000000..62b1226
--- /dev/null
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kinesis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.common.aws.AWSCredentialsConfig;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerCache;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class KinesisSamplerSpecTest extends EasyMockSupport
+{
+  private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+  private static final String STREAM = "sampling";
+  private static final String SHARD_ID = "1";
+  private static final DataSchema DATA_SCHEMA = new DataSchema(
+      "test_ds",
+      objectMapper.convertValue(
+          new StringInputRowParser(
+              new JSONParseSpec(
+                  new TimestampSpec("timestamp", "iso", null),
+                  new DimensionsSpec(
+                      Arrays.asList(
+                          new StringDimensionSchema("dim1"),
+                          new StringDimensionSchema("dim1t"),
+                          new StringDimensionSchema("dim2"),
+                          new LongDimensionSchema("dimLong"),
+                          new FloatDimensionSchema("dimFloat")
+                      ),
+                      null,
+                      null
+                  ),
+                  new JSONPathSpec(true, ImmutableList.of()),
+                  ImmutableMap.of()
+              ),
+              StandardCharsets.UTF_8.name()
+          ),
+          Map.class
+      ),
+      new AggregatorFactory[]{
+          new DoubleSumAggregatorFactory("met1sum", "met1"),
+          new CountAggregatorFactory("rows")
+      },
+      new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+      null,
+      objectMapper
+  );
+
+  private final KinesisRecordSupplier recordSupplier = 
mock(KinesisRecordSupplier.class);
+
+  private static List<OrderedPartitionableRecord<String, String>> 
generateRecords(String stream)
+  {
+    return ImmutableList.of(
+        new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c", 
"y", "10", "20.0", "1.0")),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "5",
+            jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+        ),
+        new OrderedPartitionableRecord<>(
+            stream,
+            "1",
+            "6",
+            Collections.singletonList(StringUtils.toUtf8("unparseable"))
+        ),
+        new OrderedPartitionableRecord<>(stream, "1", "8", 
Collections.singletonList(StringUtils.toUtf8("{}")))
+    );
+  }
+
+  @Test(timeout = 10_000L)
+  public void testSample() throws Exception
+  {
+    
expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();
+
+    recordSupplier.assign(ImmutableSet.of(StreamPartition.of(STREAM, 
SHARD_ID)));
+    expectLastCall().once();
+
+    recordSupplier.seekToEarliest(ImmutableSet.of(StreamPartition.of(STREAM, 
SHARD_ID)));
+    expectLastCall().once();
+
+    
expect(recordSupplier.poll(anyLong())).andReturn(generateRecords(STREAM)).once();
+
+    recordSupplier.close();
+    expectLastCall().once();
+
+    replayAll();
+
+    KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
+        DATA_SCHEMA,
+        null,
+        new KinesisSupervisorIOConfig(
+            STREAM,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            true,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            null,
+            false
+        ),
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null
+    );
+
+    KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec(
+        supervisorSpec,
+        new SamplerConfig(5, null, null, null),
+        new FirehoseSampler(objectMapper, new 
SamplerCache(MapCache.create(100000))),
+        null
+    );
+
+    SamplerResponse response = samplerSpec.sample();
+
+    verifyAll();
+
+    Assert.assertNotNull(response.getCacheKey());
+    Assert.assertEquals(5, (int) response.getNumRowsRead());
+    Assert.assertEquals(3, (int) response.getNumRowsIndexed());
+    Assert.assertEquals(5, response.getData().size());
+
+    Iterator<SamplerResponse.SamplerResponseRow> it = 
response.getData().iterator();
+
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        
"{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+        ImmutableMap.<String, Object>builder()
+            .put("__time", 1199145600000L)
+            .put("dim1", "a")
+            .put("dim2", "y")
+            .put("dimLong", 10L)
+            .put("dimFloat", 20.0F)
+            .put("rows", 1L)
+            .put("met1sum", 1.0)
+            .build(),
+        null,
+        null
+    ), it.next());
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        
"{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+        ImmutableMap.<String, Object>builder()
+            .put("__time", 1230768000000L)
+            .put("dim1", "b")
+            .put("dim2", "y")
+            .put("dimLong", 10L)
+            .put("dimFloat", 20.0F)
+            .put("rows", 1L)
+            .put("met1sum", 1.0)
+            .build(),
+        null,
+        null
+    ), it.next());
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        
"{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+        ImmutableMap.<String, Object>builder()
+            .put("__time", 1262304000000L)
+            .put("dim1", "c")
+            .put("dim2", "y")
+            .put("dimLong", 10L)
+            .put("dimFloat", 20.0F)
+            .put("rows", 1L)
+            .put("met1sum", 1.0)
+            .build(),
+        null,
+        null
+    ), it.next());
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        
"{\"timestamp\":\"246140482-04-24T15:36:27.903Z\",\"dim1\":\"x\",\"dim2\":\"z\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+        null,
+        true,
+        "Timestamp cannot be represented as a long: 
[MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z, 
event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10, 
dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+    ), it.next());
+    Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+        "unparseable",
+        null,
+        true,
+        "Unable to parse row [unparseable]"
+    ), it.next());
+
+    Assert.assertFalse(it.hasNext());
+  }
+
+  private static List<byte[]> jb(String ts, String dim1, String dim2, String 
dimLong, String dimFloat, String met1)
+  {
+    try {
+      return Collections.singletonList(new ObjectMapper().writeValueAsBytes(
+          ImmutableMap.builder()
+                      .put("timestamp", ts)
+                      .put("dim1", dim1)
+                      .put("dim2", dim2)
+                      .put("dimLong", dimLong)
+                      .put("dimFloat", dimFloat)
+                      .put("met1", met1)
+                      .build()
+      ));
+    }
+    catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private class TestableKinesisSamplerSpec extends KinesisSamplerSpec
+  {
+    private TestableKinesisSamplerSpec(
+        KinesisSupervisorSpec ingestionSpec,
+        SamplerConfig samplerConfig,
+        FirehoseSampler firehoseSampler,
+        AWSCredentialsConfig awsCredentialsConfig
+    )
+    {
+      super(ingestionSpec, samplerConfig, firehoseSampler, 
awsCredentialsConfig);
+    }
+
+    @Override
+    protected Firehose getFirehose(InputRowParser parser)
+    {
+      return new KinesisSamplerFirehose(parser)
+      {
+        @Override
+        protected RecordSupplier getRecordSupplier()
+        {
+          return recordSupplier;
+        }
+      };
+    }
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
new file mode 100644
index 0000000..c7303bc
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerException;
+import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
+import org.apache.druid.indexing.overlord.sampler.SamplerSpec;
+import 
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.utils.Runnables;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public abstract class SeekableStreamSamplerSpec<PartitionIdType, 
SequenceOffsetType> implements SamplerSpec
+{
+  private static final int POLL_TIMEOUT_MS = 100;
+
+  private final DataSchema dataSchema;
+  private final FirehoseSampler firehoseSampler;
+
+  protected final SeekableStreamSupervisorIOConfig ioConfig;
+  protected final SeekableStreamSupervisorTuningConfig tuningConfig;
+  protected final SamplerConfig samplerConfig;
+
+  public SeekableStreamSamplerSpec(
+      final SeekableStreamSupervisorSpec ingestionSpec,
+      final SamplerConfig samplerConfig,
+      final FirehoseSampler firehoseSampler
+  )
+  {
+    this.dataSchema = Preconditions.checkNotNull(ingestionSpec, "[spec] is 
required").getDataSchema();
+    this.ioConfig = Preconditions.checkNotNull(ingestionSpec.getIoConfig(), 
"[spec.ioConfig] is required");
+    this.tuningConfig = ingestionSpec.getTuningConfig();
+    this.samplerConfig = samplerConfig;
+    this.firehoseSampler = firehoseSampler;
+  }
+
+  @Override
+  public SamplerResponse sample()
+  {
+    return firehoseSampler.sample(
+        new FirehoseFactory()
+        {
+          @Override
+          public Firehose connect(InputRowParser parser, @Nullable File 
temporaryDirectory)
+          {
+            return getFirehose(parser);
+          }
+        },
+        dataSchema,
+        samplerConfig
+    );
+  }
+
+  protected abstract Firehose getFirehose(InputRowParser parser);
+
+  protected abstract class SeekableStreamSamplerFirehose implements Firehose
+  {
+    private final InputRowParser parser;
+    private final RecordSupplier<PartitionIdType, SequenceOffsetType> 
recordSupplier;
+
+    private Iterator<OrderedPartitionableRecord<PartitionIdType, 
SequenceOffsetType>> recordIterator;
+    private Iterator<byte[]> recordDataIterator;
+
+    private volatile boolean closed = false;
+
+    protected SeekableStreamSamplerFirehose(InputRowParser parser)
+    {
+      this.parser = parser;
+
+      if (parser instanceof StringInputRowParser) {
+        ((StringInputRowParser) parser).startFileFromBeginning();
+      }
+
+      this.recordSupplier = getRecordSupplier();
+
+      try {
+        assignAndSeek();
+      }
+      catch (InterruptedException e) {
+        throw new SamplerException(e, "Exception while seeking to partitions");
+      }
+    }
+
+    @Override
+    public boolean hasMore()
+    {
+      return !closed;
+    }
+
+    @Nullable
+    @Override
+    public InputRow nextRow()
+    {
+      InputRowPlusRaw row = nextRowWithRaw();
+      if (row.getParseException() != null) {
+        throw row.getParseException();
+      }
+
+      return row.getInputRow();
+    }
+
+    @Override
+    public InputRowPlusRaw nextRowWithRaw()
+    {
+      if (recordDataIterator == null || !recordDataIterator.hasNext()) {
+        if (recordIterator == null || !recordIterator.hasNext()) {
+          recordIterator = recordSupplier.poll(POLL_TIMEOUT_MS).iterator();
+
+          if (!recordIterator.hasNext()) {
+            return InputRowPlusRaw.of((InputRow) null, null);
+          }
+        }
+
+        recordDataIterator = recordIterator.next().getData().iterator();
+
+        if (!recordDataIterator.hasNext()) {
+          return InputRowPlusRaw.of((InputRow) null, null);
+        }
+      }
+
+      byte[] raw = recordDataIterator.next();
+
+      try {
+        List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(raw));
+        return InputRowPlusRaw.of(rows.isEmpty() ? null : rows.get(0), raw);
+      }
+      catch (ParseException e) {
+        return InputRowPlusRaw.of(raw, e);
+      }
+    }
+
+    @Override
+    public Runnable commit()
+    {
+      return Runnables.getNoopRunnable();
+    }
+
+    @Override
+    public void close()
+    {
+      if (closed) {
+        return;
+      }
+
+      closed = true;
+      recordSupplier.close();
+    }
+
+    private void assignAndSeek() throws InterruptedException
+    {
+      final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
+          .getPartitionIds(ioConfig.getStream())
+          .stream()
+          .map(x -> StreamPartition.of(ioConfig.getStream(), x))
+          .collect(Collectors.toSet());
+
+      recordSupplier.assign(partitions);
+
+      if (ioConfig.isUseEarliestSequenceNumber()) {
+        recordSupplier.seekToEarliest(partitions);
+      } else {
+        recordSupplier.seekToLatest(partitions);
+      }
+    }
+
+    protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType> 
getRecordSupplier();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to