Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3651#discussion_r108844803
--- Diff:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
---
@@ -115,10 +116,21 @@ protected ShardConsumer(KinesisDataFetcher<T>
fetcherRef,
if
(lastSequenceNum.equals(SentinelSequenceNumber.SENTINEL_AT_TIMESTAMP_SEQUENCE_NUM.get()))
{
String timestamp =
consumerConfig.getProperty(ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP);
- try {
- this.initTimestamp =
KinesisConfigUtil.initTimestampDateFormat.parse(timestamp);
- } catch (ParseException e) {
- this.initTimestamp = new Date((long)
(Double.parseDouble(timestamp) * 1000));
+
+ if
(consumerConfig.containsKey(ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT))
{
--- End diff --
I think for this, it would be nice to be able to just say:
```
String dataFormat = consumerConfig.get(
ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT,
ConsumerConfigConstants.DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT);
```
and then use whatever rendered format to build the data parser. The logic
is much easier to understand that way.
`DEFAULT_STREAM_TIMESTAMP_DATE_FORMAT` would basically be the original
format in `KinesisConfigUtil.initTimestampDateFormat`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---