code-hard-play-harder commented on code in PR #140:
URL:
https://github.com/apache/flink-connector-aws/pull/140#discussion_r1669689616
##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -217,6 +217,38 @@ properties by providing a value for
`ConsumerConfigConstants.STREAM_INITIAL_TIME
If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined
then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
(for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd`
given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without
given a pattern).
+### Configuring starting position for new streams
+
+By default, the Flink Kinesis Consumer handles new streams the same way it
handles a new shard for an existing stream, and it starts consuming from the
earliest record (same behaviour as TRIM_HORIZON).
+
+This behaviour is fine if you're consuming from a stream that you don't want
to lose any data from, but if you're consuming from a stream with a large
retention and where it is fine to start consuming from "now",
+or more generally started from that is defined in
`ConsumerConfigConstants.STREAM_INITIAL_POSITION`, this was not possible
before.
+
+This behaviour can now be enabled by setting the
`ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to
true, which will make ALL new streams "reset" to consume from the initial
position
+instead of starting from the beginning.
+
+If you just want to force a particular new stream to start consuming from the
defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the
`ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property
(described below) instead.
+
+### Resetting specific streams to the starting position
+
+One of the features of the Flink Kinesis Consumer is that it keeps track of
the offset that the application is at for each shard, so that if the
application is restarted we can start consuming from that offset
+when restoring from snapshot.
+
+This is the ideal behaviour most of the time, but what if you want to jump to
`LATEST` or go back to `TRIM_HORIZON` for a stream that is already being
tracked by the Flink Kinesis Consumer?
+
+You can now do this via the
`ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property,
which expects a comma separated list of strings referring to the names of the
Kinesis Streams to reset.
+
+For example, if you configure your application with
+```
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
Review Comment:
I was wondering if we are able to set different streams with different
INITIAL POSITION. Let's say we would add `streamA`, `streamB` and `streamC` as
new streams, I want to have `streamA` and `streamB` to consume from `LATEST`
and `streamC` from `AT_TIMESTAMP`. Is this possible?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]