antssilva96 commented on code in PR #140:
URL:
https://github.com/apache/flink-connector-aws/pull/140#discussion_r1670116361
##########
docs/content/docs/connectors/datastream/kinesis.md:
##########
@@ -217,6 +217,38 @@ properties by providing a value for
`ConsumerConfigConstants.STREAM_INITIAL_TIME
If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined
then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
(for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd`
given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without
given a pattern).
+### Configuring starting position for new streams
+
+By default, the Flink Kinesis Consumer handles new streams the same way it
handles a new shard for an existing stream, and it starts consuming from the
earliest record (same behaviour as TRIM_HORIZON).
+
+This behaviour is fine if you're consuming from a stream that you don't want
to lose any data from, but if you're consuming from a stream with a large
retention and where it is fine to start consuming from "now",
+or more generally started from that is defined in
`ConsumerConfigConstants.STREAM_INITIAL_POSITION`, this was not possible
before.
+
+This behaviour can now be enabled by setting the
`ConsumerConfigConstants.APPLY_STREAM_INITIAL_POSITION_FOR_NEW_STREAMS` flag to
true, which will make ALL new streams "reset" to consume from the initial
position
+instead of starting from the beginning.
+
+If you just want to force a particular new stream to start consuming from the
defined `ConsumerConfigConstants.STREAM_INITIAL_POSITION`, you can use the
`ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property
(described below) instead.
+
+### Resetting specific streams to the starting position
+
+One of the features of the Flink Kinesis Consumer is that it keeps track of
the offset that the application is at for each shard, so that if the
application is restarted we can start consuming from that offset
+when restoring from snapshot.
+
+This is the ideal behaviour most of the time, but what if you want to jump to
`LATEST` or go back to `TRIM_HORIZON` for a stream that is already being
tracked by the Flink Kinesis Consumer?
+
+You can now do this via the
`ConsumerConfigConstants.STREAMS_TO_APPLY_STREAM_INITIAL_POSITION_TO` property,
which expects a comma separated list of strings referring to the names of the
Kinesis Streams to reset.
+
+For example, if you configure your application with
+```
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
Review Comment:
This was not possible before and is still not possible now 😅
Even though it would be possible to do that with minimal changes, I feel
like that is another feature request on its own and probably doesn't belong in
this one.
The main goal of this change is: **allow users to say when INITIAL position
should be used regardless of the stored state.**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]