This is an automated email from the ASF dual-hosted git repository.
cwylie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new d384579 Data loader (sampler component) - Kafka/Kinesis samplers
(#7566)
d384579 is described below
commit d38457933f1b78a3766198b43ce0b61ecd221179
Author: David Lim <[email protected]>
AuthorDate: Thu May 16 21:26:23 2019 -0600
Data loader (sampler component) - Kafka/Kinesis samplers (#7566)
* implement Kafka/Kinesis sampler
* add KafkaSamplerSpecTest and KinesisSamplerSpecTest
* code review changes
---
.../druid/indexing/kafka/KafkaIndexTaskModule.java | 3 +-
.../druid/indexing/kafka/KafkaSamplerSpec.java | 91 ++++++
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 270 ++++++++++++++++++
.../kinesis/KinesisIndexingServiceModule.java | 3 +-
.../druid/indexing/kinesis/KinesisSamplerSpec.java | 92 +++++++
.../indexing/kinesis/KinesisSamplerSpecTest.java | 305 +++++++++++++++++++++
.../seekablestream/SeekableStreamSamplerSpec.java | 207 ++++++++++++++
7 files changed, 969 insertions(+), 2 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
index 34bc3bc..cf780ad 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskModule.java
@@ -48,7 +48,8 @@ public class KafkaIndexTaskModule implements DruidModule
// (Older versions of Druid didn't specify a type name and got
this one by default.)
new NamedType(KafkaIndexTaskTuningConfig.class,
"KafkaTuningConfig"),
new NamedType(KafkaSupervisorTuningConfig.class, "kafka"),
- new NamedType(KafkaSupervisorSpec.class, "kafka")
+ new NamedType(KafkaSupervisorSpec.class, "kafka"),
+ new NamedType(KafkaSamplerSpec.class, "kafka")
)
);
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
new file mode 100644
index 0000000..63f519b
--- /dev/null
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaSamplerSpec.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class KafkaSamplerSpec extends SeekableStreamSamplerSpec
+{
+ private final ObjectMapper objectMapper;
+
+ @JsonCreator
+ public KafkaSamplerSpec(
+ @JsonProperty("spec") final KafkaSupervisorSpec ingestionSpec,
+ @JsonProperty("samplerConfig") final SamplerConfig samplerConfig,
+ @JacksonInject FirehoseSampler firehoseSampler,
+ @JacksonInject ObjectMapper objectMapper
+ )
+ {
+ super(ingestionSpec, samplerConfig, firehoseSampler);
+
+ this.objectMapper = objectMapper;
+ }
+
+ @Override
+ protected Firehose getFirehose(InputRowParser parser)
+ {
+ return new KafkaSamplerFirehose(parser);
+ }
+
+ protected class KafkaSamplerFirehose extends SeekableStreamSamplerFirehose
+ {
+ private KafkaSamplerFirehose(InputRowParser parser)
+ {
+ super(parser);
+ }
+
+ @Override
+ protected RecordSupplier getRecordSupplier()
+ {
+ ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+ try {
+
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ final Map<String, Object> props = new
HashMap<>(((KafkaSupervisorIOConfig) ioConfig).getConsumerProperties());
+
+ props.put("enable.auto.commit", "false");
+ props.put("auto.offset.reset", "none");
+ props.put("key.deserializer", ByteArrayDeserializer.class.getName());
+ props.put("value.deserializer", ByteArrayDeserializer.class.getName());
+ props.put("request.timeout.ms",
Integer.toString(samplerConfig.getTimeoutMs()));
+
+ return new KafkaRecordSupplier(props, objectMapper);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currCtxCl);
+ }
+ }
+ }
+}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
new file mode 100644
index 0000000..5820047
--- /dev/null
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.apache.curator.test.TestingCluster;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.test.TestBroker;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerCache;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaSamplerSpecTest
+{
+ private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+ private static final String TOPIC = "sampling";
+ private static final DataSchema DATA_SCHEMA = new DataSchema(
+ "test_ds",
+ objectMapper.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim1t"),
+ new StringDimensionSchema("dim2"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ ),
+ null,
+ null
+ ),
+ new JSONPathSpec(true, ImmutableList.of()),
+ ImmutableMap.of()
+ ),
+ StandardCharsets.UTF_8.name()
+ ),
+ Map.class
+ ),
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("met1sum", "met1"),
+ new CountAggregatorFactory("rows")
+ },
+ new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+ null,
+ objectMapper
+ );
+
+ private static TestingCluster zkServer;
+ private static TestBroker kafkaServer;
+
+ private static List<ProducerRecord<byte[], byte[]>> generateRecords(String
topic)
+ {
+ return ImmutableList.of(
+ new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10",
"20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null,
jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")),
+ new ProducerRecord<>(topic, 0, null,
StringUtils.toUtf8("unparseable")),
+ new ProducerRecord<>(topic, 0, null, null)
+ );
+ }
+
+ @BeforeClass
+ public static void setupClass() throws Exception
+ {
+ zkServer = new TestingCluster(1);
+ zkServer.start();
+
+ kafkaServer = new TestBroker(zkServer.getConnectString(), null, 1,
ImmutableMap.of("num.partitions", "2"));
+ kafkaServer.start();
+ }
+
+ @AfterClass
+ public static void tearDownClass() throws Exception
+ {
+ kafkaServer.close();
+ zkServer.stop();
+ }
+
+ @Test(timeout = 30_000L)
+ public void testSample()
+ {
+ insertData(generateRecords(TOPIC));
+
+ KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
+ DATA_SCHEMA,
+ null,
+ new KafkaSupervisorIOConfig(
+ TOPIC,
+ null,
+ null,
+ null,
+ kafkaServer.consumerProperties(),
+ null,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
+ supervisorSpec,
+ new SamplerConfig(5, null, null, null),
+ new FirehoseSampler(objectMapper, new
SamplerCache(MapCache.create(100000))),
+ objectMapper
+ );
+
+ SamplerResponse response = samplerSpec.sample();
+
+ Assert.assertNotNull(response.getCacheKey());
+ Assert.assertEquals(5, (int) response.getNumRowsRead());
+ Assert.assertEquals(3, (int) response.getNumRowsIndexed());
+ Assert.assertEquals(5, response.getData().size());
+
+ Iterator<SamplerResponse.SamplerResponseRow> it =
response.getData().iterator();
+
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+
"{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+ ImmutableMap.<String, Object>builder()
+ .put("__time", 1199145600000L)
+ .put("dim1", "a")
+ .put("dim2", "y")
+ .put("dimLong", 10L)
+ .put("dimFloat", 20.0F)
+ .put("rows", 1L)
+ .put("met1sum", 1.0)
+ .build(),
+ null,
+ null
+ ), it.next());
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+
"{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+ ImmutableMap.<String, Object>builder()
+ .put("__time", 1230768000000L)
+ .put("dim1", "b")
+ .put("dim2", "y")
+ .put("dimLong", 10L)
+ .put("dimFloat", 20.0F)
+ .put("rows", 1L)
+ .put("met1sum", 1.0)
+ .build(),
+ null,
+ null
+ ), it.next());
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+
"{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+ ImmutableMap.<String, Object>builder()
+ .put("__time", 1262304000000L)
+ .put("dim1", "c")
+ .put("dim2", "y")
+ .put("dimLong", 10L)
+ .put("dimFloat", 20.0F)
+ .put("rows", 1L)
+ .put("met1sum", 1.0)
+ .build(),
+ null,
+ null
+ ), it.next());
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+
"{\"timestamp\":\"246140482-04-24T15:36:27.903Z\",\"dim1\":\"x\",\"dim2\":\"z\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+ null,
+ true,
+ "Timestamp cannot be represented as a long:
[MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z,
event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10,
dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+ ), it.next());
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+ "unparseable",
+ null,
+ true,
+ "Unable to parse row [unparseable]"
+ ), it.next());
+
+ Assert.assertFalse(it.hasNext());
+ }
+
+ private static void insertData(List<ProducerRecord<byte[], byte[]>> data)
+ {
+ try (final KafkaProducer<byte[], byte[]> kafkaProducer =
kafkaServer.newProducer()) {
+ kafkaProducer.initTransactions();
+ kafkaProducer.beginTransaction();
+
+ data.forEach(kafkaProducer::send);
+
+ kafkaProducer.commitTransaction();
+ }
+ }
+
+ private static byte[] jb(String timestamp, String dim1, String dim2, String
dimLong, String dimFloat, String met1)
+ {
+ try {
+ return new ObjectMapper().writeValueAsBytes(
+ ImmutableMap.builder()
+ .put("timestamp", timestamp)
+ .put("dim1", dim1)
+ .put("dim2", dim2)
+ .put("dimLong", dimLong)
+ .put("dimFloat", dimFloat)
+ .put("met1", met1)
+ .build()
+ );
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java
index cba5166..3cea45a 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexingServiceModule.java
@@ -48,7 +48,8 @@ public class KinesisIndexingServiceModule implements
DruidModule
new NamedType(KinesisDataSourceMetadata.class, "kinesis"),
new NamedType(KinesisIndexTaskIOConfig.class, "kinesis"),
new NamedType(KinesisSupervisorTuningConfig.class, "kinesis"),
- new NamedType(KinesisSupervisorSpec.class, "kinesis")
+ new NamedType(KinesisSupervisorSpec.class, "kinesis"),
+ new NamedType(KinesisSamplerSpec.class, "kinesis")
)
);
}
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
new file mode 100644
index 0000000..1fd36f2
--- /dev/null
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpec.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kinesis;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.inject.name.Named;
+import org.apache.druid.common.aws.AWSCredentialsConfig;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec;
+import
org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.seekablestream.SeekableStreamSamplerSpec;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+
+public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
+{
+ private final AWSCredentialsConfig awsCredentialsConfig;
+
+ @JsonCreator
+ public KinesisSamplerSpec(
+ @JsonProperty("spec") final KinesisSupervisorSpec ingestionSpec,
+ @JsonProperty("samplerConfig") final SamplerConfig samplerConfig,
+ @JacksonInject FirehoseSampler firehoseSampler,
+ @JacksonInject @Named("kinesis") AWSCredentialsConfig
awsCredentialsConfig
+ )
+ {
+ super(ingestionSpec, samplerConfig, firehoseSampler);
+
+ this.awsCredentialsConfig = awsCredentialsConfig;
+ }
+
+ @Override
+ protected Firehose getFirehose(InputRowParser parser)
+ {
+ return new KinesisSamplerFirehose(parser);
+ }
+
+ protected class KinesisSamplerFirehose extends SeekableStreamSamplerFirehose
+ {
+ protected KinesisSamplerFirehose(InputRowParser parser)
+ {
+ super(parser);
+ }
+
+ @Override
+ protected RecordSupplier getRecordSupplier()
+ {
+ KinesisSupervisorIOConfig ioConfig = (KinesisSupervisorIOConfig)
KinesisSamplerSpec.this.ioConfig;
+ KinesisSupervisorTuningConfig tuningConfig =
((KinesisSupervisorTuningConfig) KinesisSamplerSpec.this.tuningConfig);
+
+ return new KinesisRecordSupplier(
+ KinesisRecordSupplier.getAmazonKinesisClient(
+ ioConfig.getEndpoint(),
+ awsCredentialsConfig,
+ ioConfig.getAwsAssumedRoleArn(),
+ ioConfig.getAwsExternalId()
+ ),
+ ioConfig.getRecordsPerFetch(),
+ ioConfig.getFetchDelayMillis(),
+ 1,
+ ioConfig.isDeaggregate(),
+ tuningConfig.getRecordBufferSize(),
+ tuningConfig.getRecordBufferOfferTimeout(),
+ tuningConfig.getRecordBufferFullWait(),
+ tuningConfig.getFetchSequenceNumberTimeout(),
+ tuningConfig.getMaxRecordsPerPoll()
+ );
+ }
+ }
+}
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
new file mode 100644
index 0000000..62b1226
--- /dev/null
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kinesis;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.druid.client.cache.MapCache;
+import org.apache.druid.common.aws.AWSCredentialsConfig;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorIOConfig;
+import org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorSpec;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerCache;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
+import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.java.util.common.parsers.JSONPathSpec;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
+import org.easymock.EasyMockSupport;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyLong;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+
+public class KinesisSamplerSpecTest extends EasyMockSupport
+{
+ private static final ObjectMapper objectMapper = TestHelper.makeJsonMapper();
+ private static final String STREAM = "sampling";
+ private static final String SHARD_ID = "1";
+ private static final DataSchema DATA_SCHEMA = new DataSchema(
+ "test_ds",
+ objectMapper.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ Arrays.asList(
+ new StringDimensionSchema("dim1"),
+ new StringDimensionSchema("dim1t"),
+ new StringDimensionSchema("dim2"),
+ new LongDimensionSchema("dimLong"),
+ new FloatDimensionSchema("dimFloat")
+ ),
+ null,
+ null
+ ),
+ new JSONPathSpec(true, ImmutableList.of()),
+ ImmutableMap.of()
+ ),
+ StandardCharsets.UTF_8.name()
+ ),
+ Map.class
+ ),
+ new AggregatorFactory[]{
+ new DoubleSumAggregatorFactory("met1sum", "met1"),
+ new CountAggregatorFactory("rows")
+ },
+ new UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null),
+ null,
+ objectMapper
+ );
+
+ private final KinesisRecordSupplier recordSupplier =
mock(KinesisRecordSupplier.class);
+
+ private static List<OrderedPartitionableRecord<String, String>>
generateRecords(String stream)
+ {
+ return ImmutableList.of(
+ new OrderedPartitionableRecord<>(stream, "1", "0", jb("2008", "a",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "1", jb("2009", "b",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(stream, "1", "2", jb("2010", "c",
"y", "10", "20.0", "1.0")),
+ new OrderedPartitionableRecord<>(
+ stream,
+ "1",
+ "5",
+ jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")
+ ),
+ new OrderedPartitionableRecord<>(
+ stream,
+ "1",
+ "6",
+ Collections.singletonList(StringUtils.toUtf8("unparseable"))
+ ),
+ new OrderedPartitionableRecord<>(stream, "1", "8",
Collections.singletonList(StringUtils.toUtf8("{}")))
+ );
+ }
+
+ @Test(timeout = 10_000L)
+ public void testSample() throws Exception
+ {
+
expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).once();
+
+ recordSupplier.assign(ImmutableSet.of(StreamPartition.of(STREAM,
SHARD_ID)));
+ expectLastCall().once();
+
+ recordSupplier.seekToEarliest(ImmutableSet.of(StreamPartition.of(STREAM,
SHARD_ID)));
+ expectLastCall().once();
+
+
expect(recordSupplier.poll(anyLong())).andReturn(generateRecords(STREAM)).once();
+
+ recordSupplier.close();
+ expectLastCall().once();
+
+ replayAll();
+
+ KinesisSupervisorSpec supervisorSpec = new KinesisSupervisorSpec(
+ DATA_SCHEMA,
+ null,
+ new KinesisSupervisorIOConfig(
+ STREAM,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ false
+ ),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null
+ );
+
+ KinesisSamplerSpec samplerSpec = new TestableKinesisSamplerSpec(
+ supervisorSpec,
+ new SamplerConfig(5, null, null, null),
+ new FirehoseSampler(objectMapper, new
SamplerCache(MapCache.create(100000))),
+ null
+ );
+
+ SamplerResponse response = samplerSpec.sample();
+
+ verifyAll();
+
+ Assert.assertNotNull(response.getCacheKey());
+ Assert.assertEquals(5, (int) response.getNumRowsRead());
+ Assert.assertEquals(3, (int) response.getNumRowsIndexed());
+ Assert.assertEquals(5, response.getData().size());
+
+ Iterator<SamplerResponse.SamplerResponseRow> it =
response.getData().iterator();
+
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+
"{\"timestamp\":\"2008\",\"dim1\":\"a\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+ ImmutableMap.<String, Object>builder()
+ .put("__time", 1199145600000L)
+ .put("dim1", "a")
+ .put("dim2", "y")
+ .put("dimLong", 10L)
+ .put("dimFloat", 20.0F)
+ .put("rows", 1L)
+ .put("met1sum", 1.0)
+ .build(),
+ null,
+ null
+ ), it.next());
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+
"{\"timestamp\":\"2009\",\"dim1\":\"b\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+ ImmutableMap.<String, Object>builder()
+ .put("__time", 1230768000000L)
+ .put("dim1", "b")
+ .put("dim2", "y")
+ .put("dimLong", 10L)
+ .put("dimFloat", 20.0F)
+ .put("rows", 1L)
+ .put("met1sum", 1.0)
+ .build(),
+ null,
+ null
+ ), it.next());
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+
"{\"timestamp\":\"2010\",\"dim1\":\"c\",\"dim2\":\"y\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+ ImmutableMap.<String, Object>builder()
+ .put("__time", 1262304000000L)
+ .put("dim1", "c")
+ .put("dim2", "y")
+ .put("dimLong", 10L)
+ .put("dimFloat", 20.0F)
+ .put("rows", 1L)
+ .put("met1sum", 1.0)
+ .build(),
+ null,
+ null
+ ), it.next());
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+
"{\"timestamp\":\"246140482-04-24T15:36:27.903Z\",\"dim1\":\"x\",\"dim2\":\"z\",\"dimLong\":\"10\",\"dimFloat\":\"20.0\",\"met1\":\"1.0\"}",
+ null,
+ true,
+ "Timestamp cannot be represented as a long:
[MapBasedInputRow{timestamp=246140482-04-24T15:36:27.903Z,
event={timestamp=246140482-04-24T15:36:27.903Z, dim1=x, dim2=z, dimLong=10,
dimFloat=20.0, met1=1.0}, dimensions=[dim1, dim1t, dim2, dimLong, dimFloat]}]"
+ ), it.next());
+ Assert.assertEquals(new SamplerResponse.SamplerResponseRow(
+ "unparseable",
+ null,
+ true,
+ "Unable to parse row [unparseable]"
+ ), it.next());
+
+ Assert.assertFalse(it.hasNext());
+ }
+
+ private static List<byte[]> jb(String ts, String dim1, String dim2, String
dimLong, String dimFloat, String met1)
+ {
+ try {
+ return Collections.singletonList(new ObjectMapper().writeValueAsBytes(
+ ImmutableMap.builder()
+ .put("timestamp", ts)
+ .put("dim1", dim1)
+ .put("dim2", dim2)
+ .put("dimLong", dimLong)
+ .put("dimFloat", dimFloat)
+ .put("met1", met1)
+ .build()
+ ));
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private class TestableKinesisSamplerSpec extends KinesisSamplerSpec
+ {
+ private TestableKinesisSamplerSpec(
+ KinesisSupervisorSpec ingestionSpec,
+ SamplerConfig samplerConfig,
+ FirehoseSampler firehoseSampler,
+ AWSCredentialsConfig awsCredentialsConfig
+ )
+ {
+ super(ingestionSpec, samplerConfig, firehoseSampler,
awsCredentialsConfig);
+ }
+
+ @Override
+ protected Firehose getFirehose(InputRowParser parser)
+ {
+ return new KinesisSamplerFirehose(parser)
+ {
+ @Override
+ protected RecordSupplier getRecordSupplier()
+ {
+ return recordSupplier;
+ }
+ };
+ }
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
new file mode 100644
index 0000000..c7303bc
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpec.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.Firehose;
+import org.apache.druid.data.input.FirehoseFactory;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowPlusRaw;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.indexing.overlord.sampler.FirehoseSampler;
+import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
+import org.apache.druid.indexing.overlord.sampler.SamplerException;
+import org.apache.druid.indexing.overlord.sampler.SamplerResponse;
+import org.apache.druid.indexing.overlord.sampler.SamplerSpec;
+import
org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord;
+import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
+import org.apache.druid.indexing.seekablestream.common.StreamPartition;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
+import org.apache.druid.java.util.common.parsers.ParseException;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.utils.Runnables;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public abstract class SeekableStreamSamplerSpec<PartitionIdType,
SequenceOffsetType> implements SamplerSpec
+{
+ private static final int POLL_TIMEOUT_MS = 100;
+
+ private final DataSchema dataSchema;
+ private final FirehoseSampler firehoseSampler;
+
+ protected final SeekableStreamSupervisorIOConfig ioConfig;
+ protected final SeekableStreamSupervisorTuningConfig tuningConfig;
+ protected final SamplerConfig samplerConfig;
+
+ public SeekableStreamSamplerSpec(
+ final SeekableStreamSupervisorSpec ingestionSpec,
+ final SamplerConfig samplerConfig,
+ final FirehoseSampler firehoseSampler
+ )
+ {
+ this.dataSchema = Preconditions.checkNotNull(ingestionSpec, "[spec] is
required").getDataSchema();
+ this.ioConfig = Preconditions.checkNotNull(ingestionSpec.getIoConfig(),
"[spec.ioConfig] is required");
+ this.tuningConfig = ingestionSpec.getTuningConfig();
+ this.samplerConfig = samplerConfig;
+ this.firehoseSampler = firehoseSampler;
+ }
+
+ @Override
+ public SamplerResponse sample()
+ {
+ return firehoseSampler.sample(
+ new FirehoseFactory()
+ {
+ @Override
+ public Firehose connect(InputRowParser parser, @Nullable File
temporaryDirectory)
+ {
+ return getFirehose(parser);
+ }
+ },
+ dataSchema,
+ samplerConfig
+ );
+ }
+
+ protected abstract Firehose getFirehose(InputRowParser parser);
+
+ protected abstract class SeekableStreamSamplerFirehose implements Firehose
+ {
+ private final InputRowParser parser;
+ private final RecordSupplier<PartitionIdType, SequenceOffsetType>
recordSupplier;
+
+ private Iterator<OrderedPartitionableRecord<PartitionIdType,
SequenceOffsetType>> recordIterator;
+ private Iterator<byte[]> recordDataIterator;
+
+ private volatile boolean closed = false;
+
+ protected SeekableStreamSamplerFirehose(InputRowParser parser)
+ {
+ this.parser = parser;
+
+ if (parser instanceof StringInputRowParser) {
+ ((StringInputRowParser) parser).startFileFromBeginning();
+ }
+
+ this.recordSupplier = getRecordSupplier();
+
+ try {
+ assignAndSeek();
+ }
+ catch (InterruptedException e) {
+ throw new SamplerException(e, "Exception while seeking to partitions");
+ }
+ }
+
+ @Override
+ public boolean hasMore()
+ {
+ return !closed;
+ }
+
+ @Nullable
+ @Override
+ public InputRow nextRow()
+ {
+ InputRowPlusRaw row = nextRowWithRaw();
+ if (row.getParseException() != null) {
+ throw row.getParseException();
+ }
+
+ return row.getInputRow();
+ }
+
+ @Override
+ public InputRowPlusRaw nextRowWithRaw()
+ {
+ if (recordDataIterator == null || !recordDataIterator.hasNext()) {
+ if (recordIterator == null || !recordIterator.hasNext()) {
+ recordIterator = recordSupplier.poll(POLL_TIMEOUT_MS).iterator();
+
+ if (!recordIterator.hasNext()) {
+ return InputRowPlusRaw.of((InputRow) null, null);
+ }
+ }
+
+ recordDataIterator = recordIterator.next().getData().iterator();
+
+ if (!recordDataIterator.hasNext()) {
+ return InputRowPlusRaw.of((InputRow) null, null);
+ }
+ }
+
+ byte[] raw = recordDataIterator.next();
+
+ try {
+ List<InputRow> rows = parser.parseBatch(ByteBuffer.wrap(raw));
+ return InputRowPlusRaw.of(rows.isEmpty() ? null : rows.get(0), raw);
+ }
+ catch (ParseException e) {
+ return InputRowPlusRaw.of(raw, e);
+ }
+ }
+
+ @Override
+ public Runnable commit()
+ {
+ return Runnables.getNoopRunnable();
+ }
+
+ @Override
+ public void close()
+ {
+ if (closed) {
+ return;
+ }
+
+ closed = true;
+ recordSupplier.close();
+ }
+
+ private void assignAndSeek() throws InterruptedException
+ {
+ final Set<StreamPartition<PartitionIdType>> partitions = recordSupplier
+ .getPartitionIds(ioConfig.getStream())
+ .stream()
+ .map(x -> StreamPartition.of(ioConfig.getStream(), x))
+ .collect(Collectors.toSet());
+
+ recordSupplier.assign(partitions);
+
+ if (ioConfig.isUseEarliestSequenceNumber()) {
+ recordSupplier.seekToEarliest(partitions);
+ } else {
+ recordSupplier.seekToLatest(partitions);
+ }
+ }
+
+ protected abstract RecordSupplier<PartitionIdType, SequenceOffsetType>
getRecordSupplier();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]