This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f60bfc8 Add support for Kinesis datasource in Github Events recipe
(#8333)
f60bfc8 is described below
commit f60bfc8df6172c7d00f7ca9bd9d2995dde8d33e3
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Mar 18 00:07:03 2022 +0530
Add support for Kinesis datasource in Github Events recipe (#8333)
* Add support for Kinesis datasource in Github Events recipe
* Add support to publish github events in StreamGithubEventsCommand
* Replace string with Constants
* refactor: rename kinesis variables to reflect they are kinesis props
* Add missing license headers
* Fix linting errors
* Refactor: Rename variables to be kinesis provider agnostic, Modify
javadoc to reflect kinesis support
* Add timeouts while checking status of Kinesis stream
* Modify Kinesis producer to detach static aws credentials from endpoint
provider
* Make accessKey and secretKey configurable for Stream command
* Fix linting errors
* fix linting
---
.../pinot-stream-ingestion/pinot-kinesis/pom.xml | 13 ++
.../stream/kinesis/server/KinesisDataProducer.java | 104 +++++++++++++++
.../kinesis/server/KinesisDataServerStartable.java | 145 +++++++++++++++++++++
.../apache/pinot/tools/GitHubEventsQuickstart.java | 86 +++++++++---
.../command/GitHubEventsQuickStartCommand.java | 12 +-
.../admin/command/StreamGitHubEventsCommand.java | 41 +++++-
.../PullRequestMergedEventsStream.java | 37 ++++++
.../pinot/tools/utils/KinesisStarterUtils.java | 66 ++++++++++
.../apache/pinot/tools/utils/StreamSourceType.java | 24 ++++
...MergedEvents_kinesis_realtime_table_config.json | 39 ++++++
10 files changed, 545 insertions(+), 22 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
index 4bdbe1a..e79c881 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
@@ -39,6 +39,7 @@
<aws.version>2.14.28</aws.version>
<easymock.version>4.2</easymock.version>
<reactive.version>1.0.2</reactive.version>
+ <localstack-utils.version>0.2.19</localstack-utils.version>
</properties>
<dependencyManagement>
@@ -185,6 +186,18 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>cloud.localstack</groupId>
+ <artifactId>localstack-utils</artifactId>
+ <version>${localstack-utils.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
</dependencies>
<profiles>
<profile>
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
new file mode 100644
index 0000000..150ac78
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataProducer.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kinesis.server;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.pinot.spi.stream.StreamDataProducer;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.core.SdkBytes;
+import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
+import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
+
+public class KinesisDataProducer implements StreamDataProducer {
+ public static final String ENDPOINT = "endpoint";
+ public static final String REGION = "region";
+ public static final String ACCESS = "access";
+ public static final String SECRET = "secret";
+ public static final String DEFAULT_PORT = "4566";
+ public static final String DEFAULT_ENDPOINT = "http://localhost:4566";
+
+ private KinesisClient _kinesisClient;
+
+ @Override
+ public void init(Properties props) {
+ try {
+ KinesisClientBuilder kinesisClientBuilder;
+ if (props.containsKey(ACCESS) && props.containsKey(SECRET)) {
+ kinesisClientBuilder =
KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+ .credentialsProvider(getLocalAWSCredentials(props))
+ .httpClientBuilder(new
ApacheSdkHttpService().createHttpClientBuilder());
+ } else {
+ kinesisClientBuilder =
+
KinesisClient.builder().region(Region.of(props.getProperty(REGION)))
+ .credentialsProvider(DefaultCredentialsProvider.create())
+ .httpClientBuilder(new
ApacheSdkHttpService().createHttpClientBuilder());
+ }
+
+ if (props.containsKey(ENDPOINT)) {
+ String kinesisEndpoint = props.getProperty(ENDPOINT, DEFAULT_ENDPOINT);
+ try {
+ kinesisClientBuilder = kinesisClientBuilder.endpointOverride(new
URI(kinesisEndpoint));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("URI syntax is not correctly
specified for endpoint: "
+ + kinesisEndpoint, e);
+ }
+ }
+
+ _kinesisClient = kinesisClientBuilder.build();
+ } catch (Exception e) {
+ _kinesisClient = null;
+ }
+ }
+
+ @Override
+ public void produce(String topic, byte[] payload) {
+ PutRecordRequest putRecordRequest =
+
PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload))
+ .partitionKey(UUID.randomUUID().toString()).build();
+ PutRecordResponse putRecordResponse =
_kinesisClient.putRecord(putRecordRequest);
+ }
+
+ @Override
+ public void produce(String topic, byte[] key, byte[] payload) {
+ PutRecordRequest putRecordRequest =
+
PutRecordRequest.builder().streamName(topic).data(SdkBytes.fromByteArray(payload)).partitionKey(new
String(key))
+ .build();
+ PutRecordResponse putRecordResponse =
_kinesisClient.putRecord(putRecordRequest);
+ }
+
+ @Override
+ public void close() {
+ _kinesisClient.close();
+ }
+
+ private AwsCredentialsProvider getLocalAWSCredentials(Properties props) {
+ return StaticCredentialsProvider.create(
+ AwsBasicCredentials.create(props.getProperty(ACCESS),
props.getProperty(SECRET)));
+ }
+}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java
new file mode 100644
index 0000000..4bf07f0
--- /dev/null
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/server/KinesisDataServerStartable.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.plugin.stream.kinesis.server;
+
+import cloud.localstack.Localstack;
+import cloud.localstack.ServiceName;
+import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
+import com.google.common.base.Function;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.utils.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
+import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
+import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
+import software.amazon.awssdk.http.SdkHttpConfigurationOption;
+import software.amazon.awssdk.http.apache.ApacheSdkHttpService;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
+import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
+import software.amazon.awssdk.utils.AttributeMap;
+
+
+public class KinesisDataServerStartable implements StreamDataServerStartable {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(KinesisDataServerStartable.class);
+
+ public static final String NUM_SHARDS_PROPERTY = "numShards";
+ public static final String DEFAULT_REGION = "us-east-1";
+ public static final String DEFAULT_ACCESS_KEY = "access";
+ public static final String DEFAULT_SECRET_KEY = "secret";
+ public static final String DEFAULT_PORT = "4566";
+
+ private final Localstack _localstackDocker = Localstack.INSTANCE;
+ LocalstackDockerConfiguration _dockerConfig;
+ Properties _serverProperties;
+ private String _localStackKinesisEndpoint = "http://localhost:%s";
+
+ @Override
+ public void init(Properties props) {
+ _serverProperties = props;
+ final Map<String, String> environmentVariables = new HashMap<>();
+ environmentVariables.put("SERVICES", ServiceName.KINESIS);
+ _dockerConfig =
+
LocalstackDockerConfiguration.builder().portEdge(_serverProperties.getProperty("port",
DEFAULT_PORT))
+
.portElasticSearch(String.valueOf(NetUtils.findOpenPort(4571))).imageTag("0.12.15")
+ .environmentVariables(environmentVariables).build();
+
+ _localStackKinesisEndpoint =
+ String.format(_localStackKinesisEndpoint,
_serverProperties.getProperty("port", DEFAULT_PORT));
+ }
+
+ @Override
+ public void start() {
+ _localstackDocker.startup(_dockerConfig);
+ }
+
+ @Override
+ public void stop() {
+ _localstackDocker.stop();
+ }
+
+ @Override
+ public void createTopic(String topic, Properties topicProps) {
+ try {
+ KinesisClient kinesisClient = KinesisClient.builder().httpClient(
+ new
ApacheSdkHttpService().createHttpClientBuilder().buildWithDefaults(
+
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
Boolean.TRUE).build()))
+
.credentialsProvider(getLocalAWSCredentials()).region(Region.of(DEFAULT_REGION))
+ .endpointOverride(new URI(_localStackKinesisEndpoint)).build();
+
+ kinesisClient.createStream(
+ CreateStreamRequest.builder().streamName(topic).shardCount((Integer)
topicProps.get(NUM_SHARDS_PROPERTY))
+ .build());
+
+ waitForCondition(new Function<Void, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable Void aVoid) {
+ try {
+ String kinesisStreamStatus =
+
kinesisClient.describeStream(DescribeStreamRequest.builder().streamName(topic).build())
+ .streamDescription().streamStatusAsString();
+
+ return kinesisStreamStatus.contentEquals("ACTIVE");
+ } catch (Exception e) {
+ LOGGER.warn("Could not fetch kinesis stream status", e);
+ return null;
+ }
+ }
+ }, 1000L, 30000, "Kinesis stream " + topic + " is not created or is not
in active state");
+
+ LOGGER.info("Kinesis stream created successfully: " + topic);
+ } catch (Exception e) {
+ LOGGER.warn("Error occurred while creating topic: " + topic, e);
+ }
+ }
+
+ @Override
+ public int getPort() {
+ return _localstackDocker.getEdgePort();
+ }
+
+ private AwsCredentialsProvider getLocalAWSCredentials() {
+ return
StaticCredentialsProvider.create(AwsBasicCredentials.create(DEFAULT_ACCESS_KEY,
DEFAULT_SECRET_KEY));
+ }
+
+ private static void waitForCondition(Function<Void, Boolean> condition, long
checkIntervalMs, long timeoutMs,
+ @Nullable String errorMessage) {
+ long endTime = System.currentTimeMillis() + timeoutMs;
+ String errorMessageSuffix = errorMessage != null ? ", error message: " +
errorMessage : "";
+ while (System.currentTimeMillis() < endTime) {
+ try {
+ if (Boolean.TRUE.equals(condition.apply(null))) {
+ return;
+ }
+ Thread.sleep(checkIntervalMs);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception while checking the condition" +
errorMessageSuffix, e);
+ }
+ }
+ LOGGER.error("Failed to meet condition in " + timeoutMs + "ms" +
errorMessageSuffix);
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
index 0af8d64..50c162d 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/GitHubEventsQuickstart.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.net.URL;
import java.util.Arrays;
import java.util.List;
+import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
import org.apache.pinot.spi.stream.StreamDataProvider;
@@ -32,6 +33,8 @@ import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import
org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
+import org.apache.pinot.tools.utils.KinesisStarterUtils;
+import org.apache.pinot.tools.utils.StreamSourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,15 +43,16 @@ import static
org.apache.pinot.tools.Quickstart.prettyPrintResponse;
/**
* Sets up a demo Pinot cluster with 1 zookeeper, 1 controller, 1 broker and 1
server
- * Sets up a demo Kafka cluster, and creates a topic pullRequestMergedEvents
+ * Sets up a demo Kafka/Kinesis cluster, and creates a topic
pullRequestMergedEvents
* Creates a realtime table pullRequestMergedEvents
* Starts the {@link PullRequestMergedEventsStream} to publish
pullRequestMergedEvents into the topic
*/
public class GitHubEventsQuickstart extends QuickStartBase {
private static final Logger LOGGER =
LoggerFactory.getLogger(GitHubEventsQuickstart.class);
- private StreamDataServerStartable _kafkaStarter;
+ private StreamDataServerStartable _serverStarter;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private String _personalAccessToken;
+ private StreamSourceType _sourceType;
public GitHubEventsQuickstart() {
}
@@ -56,16 +60,46 @@ public class GitHubEventsQuickstart extends QuickStartBase {
private void startKafka() {
_zookeeperInstance = ZkStarter.startLocalZkServer();
try {
- _kafkaStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
+ _serverStarter =
StreamDataProvider.getServerDataStartable(KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME,
KafkaStarterUtils.getDefaultKafkaConfiguration(_zookeeperInstance));
} catch (Exception e) {
throw new RuntimeException("Failed to start " +
KafkaStarterUtils.KAFKA_SERVER_STARTABLE_CLASS_NAME, e);
}
- _kafkaStarter.start();
- _kafkaStarter.createTopic("pullRequestMergedEvents",
KafkaStarterUtils.getTopicCreationProps(2));
+ _serverStarter.start();
+ _serverStarter.createTopic("pullRequestMergedEvents",
KafkaStarterUtils.getTopicCreationProps(2));
}
- private void execute(String personalAccessToken)
+ private void startKinesis() {
+ try {
+
+ Properties serverProperties = new Properties();
+ serverProperties.put(KinesisStarterUtils.PORT, 4566);
+ _serverStarter =
+
StreamDataProvider.getServerDataStartable(KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME,
+ serverProperties);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start " +
KinesisStarterUtils.KINESIS_SERVER_STARTABLE_CLASS_NAME, e);
+ }
+ _serverStarter.start();
+
+ Properties topicProperties = new Properties();
+ topicProperties.put(KinesisStarterUtils.NUM_SHARDS, 3);
+ _serverStarter.createTopic("pullRequestMergedEvents", topicProperties);
+ }
+
+ private void startStreamServer() {
+ switch (_sourceType) {
+ case KINESIS:
+ startKinesis();
+ break;
+ case KAFKA:
+ default:
+ startKafka();
+ break;
+ }
+ }
+
+ private void execute(String personalAccessToken, StreamSourceType
streamSourceType)
throws Exception {
final File quickStartDataDir =
new File(new File("githubEvents-" + System.currentTimeMillis()),
"pullRequestMergedEvents");
@@ -81,8 +115,8 @@ public class GitHubEventsQuickstart extends QuickStartBase {
URL resource =
classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_schema.json");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, schemaFile);
- resource =
-
classLoader.getResource("examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json");
+ String tableConfigFilePath = getTableConfigFilePath();
+ resource = classLoader.getResource(tableConfigFilePath);
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, tableConfigFile);
@@ -92,8 +126,8 @@ public class GitHubEventsQuickstart extends QuickStartBase {
final QuickstartRunner runner =
new QuickstartRunner(Lists.newArrayList(request), 1, 1, 1, 0, tempDir,
getConfigOverrides());
- printStatus(Color.CYAN, "***** Starting Kafka *****");
- startKafka();
+ printStatus(Color.CYAN, String.format("***** Starting %s *****",
streamSourceType));
+ startStreamServer();
printStatus(Color.CYAN, "***** Starting zookeeper, controller, server and
broker *****");
runner.startAll();
@@ -101,10 +135,11 @@ public class GitHubEventsQuickstart extends
QuickStartBase {
printStatus(Color.CYAN, "***** Adding pullRequestMergedEvents table
*****");
runner.bootstrapTable();
- printStatus(Color.CYAN, "***** Starting pullRequestMergedEvents data
stream and publishing to Kafka *****");
+ printStatus(Color.CYAN,
+ String.format("***** Starting pullRequestMergedEvents data stream and
publishing to %s *****", _sourceType));
final PullRequestMergedEventsStream pullRequestMergedEventsStream =
- new PullRequestMergedEventsStream(schemaFile.getAbsolutePath(),
"pullRequestMergedEvents",
- personalAccessToken,
PullRequestMergedEventsStream.getKafkaStreamDataProducer());
+ new PullRequestMergedEventsStream(schemaFile.getAbsolutePath(),
"pullRequestMergedEvents", personalAccessToken,
+ PullRequestMergedEventsStream.getStreamDataProducer(_sourceType));
pullRequestMergedEventsStream.execute();
printStatus(Color.CYAN, "***** Waiting for 10 seconds for a few events to
get populated *****");
Thread.sleep(10000);
@@ -113,7 +148,7 @@ public class GitHubEventsQuickstart extends QuickStartBase {
try {
printStatus(Color.GREEN, "***** Shutting down GitHubEventsQuickStart
*****");
runner.stop();
- _kafkaStarter.stop();
+ _serverStarter.stop();
ZkStarter.stopLocalZkServer(_zookeeperInstance);
FileUtils.deleteDirectory(quickStartDataDir);
} catch (Exception e) {
@@ -156,6 +191,22 @@ public class GitHubEventsQuickstart extends QuickStartBase
{
printStatus(Color.GREEN, "You can always go to http://localhost:9000 to
play around in the query console");
}
+ private String getTableConfigFilePath() {
+ String tableConfigFilePath;
+ switch (_sourceType) {
+ case KINESIS:
+ tableConfigFilePath =
+
"examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json";
+ break;
+ case KAFKA:
+ default:
+ tableConfigFilePath =
+
"examples/stream/githubEvents/pullRequestMergedEvents_realtime_table_config.json";
+ break;
+ }
+ return tableConfigFilePath;
+ }
+
@Override
public List<String> types() {
return Arrays.asList("GITHUB-EVENTS", "GITHUB_EVENTS");
@@ -164,11 +215,16 @@ public class GitHubEventsQuickstart extends
QuickStartBase {
@Override
public void execute()
throws Exception {
- execute(_personalAccessToken);
+ execute(_personalAccessToken, _sourceType);
}
public GitHubEventsQuickstart setPersonalAccessToken(String
personalAccessToken) {
_personalAccessToken = personalAccessToken;
return this;
}
+
+ public GitHubEventsQuickstart setSourceType(String sourceType) {
+ _sourceType = StreamSourceType.valueOf(sourceType.toUpperCase());
+ return this;
+ }
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java
index 578f94d..e4cc73d 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/GitHubEventsQuickStartCommand.java
@@ -33,6 +33,10 @@ public class GitHubEventsQuickStartCommand extends
AbstractBaseAdminCommand impl
@CommandLine.Option(names = {"-personalAccessToken"}, required = true,
description = "GitHub personal access token.")
private String _personalAccessToken;
+ @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka",
+ description = "Stream DataSource to use for ingesting data. Supported
values - Kafka,Kinesis")
+ private String _sourceType;
+
@CommandLine.Option(names = {"-help", "-h", "--h", "--help"}, help = true,
description = "Print this message.")
private boolean _help = false;
@@ -40,6 +44,10 @@ public class GitHubEventsQuickStartCommand extends
AbstractBaseAdminCommand impl
_personalAccessToken = personalAccessToken;
}
+ public void setSourceType(String sourceType) {
+ _sourceType = sourceType;
+ }
+
@Override
public boolean getHelp() {
return _help;
@@ -52,7 +60,7 @@ public class GitHubEventsQuickStartCommand extends
AbstractBaseAdminCommand impl
@Override
public String toString() {
- return ("GitHubEventsQuickStart -personalAccessToken " +
_personalAccessToken);
+ return ("GitHubEventsQuickStart -personalAccessToken " +
_personalAccessToken + " -sourceType" + _sourceType);
}
@Override
@@ -68,7 +76,7 @@ public class GitHubEventsQuickStartCommand extends
AbstractBaseAdminCommand impl
public boolean execute()
throws Exception {
PluginManager.get().init();
- new
GitHubEventsQuickstart().setPersonalAccessToken(_personalAccessToken).execute();
+ new
GitHubEventsQuickstart().setPersonalAccessToken(_personalAccessToken).setSourceType(_sourceType).execute();
return true;
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
index 269281d..579dd63 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StreamGitHubEventsCommand.java
@@ -19,14 +19,16 @@
package org.apache.pinot.tools.admin.command;
import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.tools.Command;
import
org.apache.pinot.tools.streams.githubevents.PullRequestMergedEventsStream;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
+import org.apache.pinot.tools.utils.StreamSourceType;
import picocli.CommandLine;
/**
- * Command to stream GitHub events into a kafka topic
+ * Command to stream GitHub events into a kafka topic or kinesis stream
*/
@CommandLine.Command(name = "StreamGitHubEvents")
public class StreamGitHubEventsCommand extends AbstractBaseAdminCommand
implements Command {
@@ -36,11 +38,29 @@ public class StreamGitHubEventsCommand extends
AbstractBaseAdminCommand implemen
@CommandLine.Option(names = {"-personalAccessToken"}, required = true,
description = "GitHub personal access token.")
private String _personalAccessToken;
+ @CommandLine.Option(names = {"-sourceType"}, defaultValue = "Kafka",
+ description = "Stream DataSource to use for ingesting data. Supported
values - Kafka,Kinesis")
+ private String _sourceType;
+
@CommandLine.Option(names = {"-kafkaBrokerList"},
description = "Kafka broker list of the kafka cluster to produce
events.")
private String _kafkaBrokerList = KafkaStarterUtils.DEFAULT_KAFKA_BROKER;
- @CommandLine.Option(names = {"-topic"}, required = true, description = "Name
of kafka topic to publish events.")
+ @CommandLine.Option(names = {"-kinesisEndpoint"},
+ description = "Endpoint of localstack or any other Kinesis cluster when
not using AWS.")
+ private String _kinesisEndpoint = null;
+
+ @CommandLine.Option(names = {"-awsRegion"}, description = "AWS Region in
which Kinesis is located")
+ private String _awsRegion = "us-east-1";
+
+ @CommandLine.Option(names = {"-awsAccessKey"}, description = "AccessKey for
AWS Account.")
+ private String _accessKey;
+
+ @CommandLine.Option(names = {"-awsSecretKey"}, description = "SecretKey for
AWS Account")
+ private String _secretKey;
+
+ @CommandLine.Option(names = {"-topic"}, required = true,
+ description = "Name of kafka-topic/kinesis-stream to publish events.")
private String _topic;
@CommandLine.Option(names = {"-eventType"},
@@ -96,7 +116,7 @@ public class StreamGitHubEventsCommand extends
AbstractBaseAdminCommand implemen
@Override
public String description() {
- return "Streams GitHubEvents into a Kafka topic";
+ return "Streams GitHubEvents into a Kafka topic or Kinesis Stream";
}
@Override
@@ -104,9 +124,20 @@ public class StreamGitHubEventsCommand extends
AbstractBaseAdminCommand implemen
throws Exception {
PluginManager.get().init();
if (PULL_REQUEST_MERGED_EVENT_TYPE.equals(_eventType)) {
+ StreamDataProducer streamDataProducer;
+ switch (StreamSourceType.valueOf(_sourceType.toUpperCase())) {
+ case KINESIS:
+ streamDataProducer =
+
PullRequestMergedEventsStream.getKinesisStreamDataProducer(_kinesisEndpoint,
_awsRegion, _accessKey,
+ _secretKey);
+ break;
+ case KAFKA:
+ default:
+ streamDataProducer =
PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList);
+ break;
+ }
PullRequestMergedEventsStream pullRequestMergedEventsStream =
- new PullRequestMergedEventsStream(_schemaFile, _topic,
_personalAccessToken,
-
PullRequestMergedEventsStream.getKafkaStreamDataProducer(_kafkaBrokerList));
+ new PullRequestMergedEventsStream(_schemaFile, _topic,
_personalAccessToken, streamDataProducer);
pullRequestMergedEventsStream.execute();
} else {
throw new UnsupportedOperationException("Event type " + _eventType + "
is unsupported");
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
index 465f2fd..ab45f45 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/githubevents/PullRequestMergedEventsStream.java
@@ -30,12 +30,15 @@ import java.util.concurrent.Executors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Quickstart;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
+import org.apache.pinot.tools.utils.KinesisStarterUtils;
+import org.apache.pinot.tools.utils.StreamSourceType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -99,6 +102,40 @@ public class PullRequestMergedEventsStream {
return
StreamDataProvider.getStreamDataProducer(KafkaStarterUtils.KAFKA_PRODUCER_CLASS_NAME,
properties);
}
+ public static StreamDataProducer getKinesisStreamDataProducer(String
endpoint, String region, String access,
+ String secret)
+ throws Exception {
+ Properties properties = new Properties();
+
+ if (StringUtils.isNotEmpty(access) && StringUtils.isNotEmpty(secret)) {
+ properties.put("access", access);
+ properties.put("secret", secret);
+ }
+
+ if (StringUtils.isNotEmpty(endpoint)) {
+ properties.put("endpoint", endpoint);
+ }
+ properties.put("region", region);
+ return
StreamDataProvider.getStreamDataProducer(KinesisStarterUtils.KINESIS_PRODUCER_CLASS_NAME,
properties);
+ }
+
+ public static StreamDataProducer getKinesisStreamDataProducer()
+ throws Exception {
+ return getKinesisStreamDataProducer("http://localhost:4566", "us-east-1",
"access", "secret");
+ }
+
+ public static StreamDataProducer getStreamDataProducer(StreamSourceType
streamSourceType)
+ throws Exception {
+ switch (streamSourceType) {
+ case KAFKA:
+ return getKafkaStreamDataProducer();
+ case KINESIS:
+ return getKinesisStreamDataProducer();
+ default:
+ throw new RuntimeException("Invalid streamSourceType specified: " +
streamSourceType);
+ }
+ }
+
public static void main(String[] args)
throws Exception {
String personalAccessToken = args[0];
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java
new file mode 100644
index 0000000..3f6160a
--- /dev/null
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.utils;
+
+import java.util.Properties;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+
+
+public class KinesisStarterUtils {
+ private KinesisStarterUtils() {
+ }
+
+ public static final String DEFAULT_KINESIS_PORT = "4566";
+ public static final String DEFAULT_KINESIS_ENDPOINT = "http://localhost:" +
DEFAULT_KINESIS_PORT;
+
+ public static final String KINESIS_SERVER_STARTABLE_CLASS_NAME =
+ getKinesisConnectorPackageName() + ".server.KinesisDataServerStartable";
+ public static final String KINESIS_PRODUCER_CLASS_NAME =
+ getKinesisConnectorPackageName() + ".server.KinesisDataProducer";
+ public static final String KINESIS_STREAM_CONSUMER_FACTORY_CLASS_NAME =
+ getKinesisConnectorPackageName() + ".KinesisConsumerFactory";
+
+ public static final String PORT = "port";
+ public static final String NUM_SHARDS = "numShards";
+
+ private static String getKinesisConnectorPackageName() {
+ return "org.apache.pinot.plugin.stream.kinesis";
+ }
+
+ public static Properties getTopicCreationProps(int numKinesisShards) {
+ Properties topicProps = new Properties();
+ topicProps.put(NUM_SHARDS, numKinesisShards);
+ return topicProps;
+ }
+
+ public static StreamDataServerStartable startServer(final int port, final
Properties baseConf) {
+ StreamDataServerStartable kinesisStarter;
+ Properties configuration = new Properties(baseConf);
+ int kinesisPort = port;
+ try {
+ configuration.put(KinesisStarterUtils.PORT, kinesisPort);
+ kinesisStarter =
StreamDataProvider.getServerDataStartable(KINESIS_SERVER_STARTABLE_CLASS_NAME,
configuration);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to start " +
KINESIS_SERVER_STARTABLE_CLASS_NAME, e);
+ }
+ kinesisStarter.start();
+ return kinesisStarter;
+ }
+}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/utils/StreamSourceType.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/StreamSourceType.java
new file mode 100644
index 0000000..8464512
--- /dev/null
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/utils/StreamSourceType.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.tools.utils;
+
+public enum StreamSourceType {
+ KAFKA,
+ KINESIS;
+}
diff --git
a/pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json
new file mode 100644
index 0000000..7cdd27b
--- /dev/null
+++
b/pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json
@@ -0,0 +1,39 @@
+{
+ "tableName": "pullRequestMergedEvents",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "timeColumnName": "mergedTimeMillis",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "60",
+ "schemaName": "pullRequestMergedEvents",
+ "replication": "1",
+ "replicasPerPartition": "1"
+ },
+ "tenants": {},
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "invertedIndexColumns": [
+ "organization",
+ "repo"
+ ],
+ "streamConfigs": {
+ "streamType": "kinesis",
+ "stream.kinesis.consumer.type": "lowlevel",
+ "stream.kinesis.topic.name": "pullRequestMergedEvents",
+ "stream.kinesis.decoder.class.name":
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
+ "stream.kinesis.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
+ "realtime.segment.flush.threshold.time": "12h",
+ "realtime.segment.flush.threshold.size": "100000",
+ "stream.kinesis.consumer.prop.auto.offset.reset": "smallest",
+ "region": "us-east-1",
+ "shardIteratorType": "TRIM_HORIZON",
+ "endpoint" : "http://localhost:4566",
+ "accessKey" : "access",
+ "secretKey": "secret"
+ }
+ },
+ "metadata": {
+ "customConfigs": {}
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]