navina commented on a change in pull request #8333:
URL: https://github.com/apache/pinot/pull/8333#discussion_r823397381
##########
File path:
pinot-tools/src/main/resources/examples/stream/githubEvents/pullRequestMergedEvents_kinesis_realtime_table_config.json
##########
@@ -0,0 +1,39 @@
+{
+ "tableName": "pullRequestMergedEvents",
+ "tableType": "REALTIME",
+ "segmentsConfig": {
+ "timeColumnName": "mergedTimeMillis",
+ "retentionTimeUnit": "DAYS",
+ "retentionTimeValue": "60",
+ "schemaName": "pullRequestMergedEvents",
+ "replication": "1",
+ "replicasPerPartition": "1"
+ },
+ "tenants": {},
+ "tableIndexConfig": {
+ "loadMode": "MMAP",
+ "invertedIndexColumns": [
+ "organization",
+ "repo"
+ ],
+ "streamConfigs": {
+ "streamType": "kinesis",
+ "stream.kinesis.consumer.type": "lowlevel",
+ "stream.kinesis.topic.name": "pullRequestMergedEvents",
+ "stream.kinesis.decoder.class.name":
"org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
+ "stream.kinesis.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kinesis.KinesisConsumerFactory",
+ "realtime.segment.flush.threshold.time": "12h",
+ "realtime.segment.flush.threshold.size": "100000",
+ "stream.kinesis.consumer.prop.auto.offset.reset": "smallest",
+ "region": "us-east-1",
+ "shardIteratorType": "TRIM_HORIZON",
Review comment:
What does `TRIM_HORIZON` mean? I only see `AFTER_SEQUENCE_NUMER` and
`LATEST` in the
[docs](https://docs.pinot.apache.org/basics/data-import/pinot-stream-ingestion/amazon-kinesis).
##########
File path: pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
##########
@@ -39,6 +39,7 @@
<aws.version>2.14.28</aws.version>
<easymock.version>4.2</easymock.version>
<reactive.version>1.0.2</reactive.version>
+ <localstack-utils.version>0.2.19</localstack-utils.version>
Review comment:
@KKcorps I am wondering if you have explored using
[testcontainers](https://www.testcontainers.org/modules/localstack/) instead of
localstack directly.
##########
File path:
pinot-tools/src/main/java/org/apache/pinot/tools/utils/KinesisStarterUtils.java
##########
@@ -0,0 +1,52 @@
+package org.apache.pinot.tools.utils;
+
+import java.util.Optional;
+import java.util.Properties;
+import java.util.ServiceLoader;
+import java.util.stream.StreamSupport;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamDataProvider;
+import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.utils.NetUtils;
+
+
+public class KinesisStarterUtils {
+ private KinesisStarterUtils(){}
+
+ public static final String DEFAULT_KINESIS_PORT = "4566";
+ public static final String DEFAULT_KINESIS_ENDPOINT =
"http://localhost:"+DEFAULT_KINESIS_PORT;
+
+ public static final String KINESIS_SERVER_STARTABLE_CLASS_NAME =
+ getKinesisConnectorPackageName() + ".server.KinesisDataServerStartable";
+ public static final String KINESIS_PRODUCER_CLASS_NAME =
getKinesisConnectorPackageName() + ".server.KinesisDataProducer";
+ public static final String KINESIS_STREAM_CONSUMER_FACTORY_CLASS_NAME =
+ getKinesisConnectorPackageName() + ".KinesisConsumerFactory";
+
+ public static final String PORT = "port";
+ public static final String NUM_SHARDS = "numShards";
+
+
+ private static String getKinesisConnectorPackageName() {
+ return "org.apache.pinot.plugin.stream.kinesis";
+ }
+
+ public static Properties getTopicCreationProps(int numKafkaPartitions) {
Review comment:
s/numKafkaPartitions/numKinesisShards/
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]