[
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16069509#comment-16069509
]
ASF GitHub Bot commented on FLINK-6352:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/3915#discussion_r124965642
--- Diff:
flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java
---
@@ -128,6 +135,53 @@ public FlinkKafkaConsumer010(List<String> topics,
KeyedDeserializationSchema<T>
}
@Override
+ public FlinkKafkaConsumerBase<T> setStartFromSpecificDate(Date date) {
+ Preconditions.checkArgument(null != date && date.getTime() <=
System.currentTimeMillis(), "Startup time must before curr time.");
+ this.startupMode = StartupMode.SPECIFIC_TIMESTAMP;
+ this.specificStartupDate = date;
+ this.specificStartupOffsets = null;
+ return this;
+ }
+
+ /**
+ * Convert flink topic partition to kafka topic partition.
+ * @param flinkTopicPartitionMap
+ * @return
+ */
+ private Map<TopicPartition, Long>
convertFlinkToKafkaTopicPartition(Map<KafkaTopicPartition, Long>
flinkTopicPartitionMap) {
+ Map<TopicPartition, Long> topicPartitionMap = new
HashMap<>(flinkTopicPartitionMap.size());
+ for (Map.Entry<KafkaTopicPartition, Long> entry :
flinkTopicPartitionMap.entrySet()) {
+ topicPartitionMap.put(new
TopicPartition(entry.getKey().getTopic(), entry.getKey().getPartition()),
entry.getValue());
+ }
+
+ return topicPartitionMap;
+
+ }
+
+ /**
+ * Search offset from timestamp for each topic in kafka. If no offset
exist, use the latest offset.
+ * @param partitionTimesMap Kafka topic partition and timestamp
+ * @return Kafka topic partition and the earliest offset after the
timestamp. If no offset exist, use the latest offset in kafka
+ */
+ private Map<KafkaTopicPartition, Long>
convertTimestampToOffset(Map<KafkaTopicPartition, Long> partitionTimesMap) {
--- End diff --
I think we need to move this conversion logic to `KafkaConsumerThread`,
otherwise we would be instantiating a KafkaConsumer just for the sake of
fetching timestamp-based offsets.
That's where the actual "`KafkaTopicPartitionStateSentinel` to actual
offset" conversions take place.
See `KafkaConsumerThread` lines 369 - 390
```
// offsets in the state of new partitions may still be placeholder sentinel
values if we are:
// (1) starting fresh,
// (2) checkpoint / savepoint state we were restored with had not
completely
// been replaced with actual offset values yet, or
// (3) the partition was newly discovered after startup;
// replace those with actual offsets, according to what the sentinel value
represent.
for (KafkaTopicPartitionState<TopicPartition> newPartitionState :
newPartitions) {
if (newPartitionState.getOffset() ==
KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerCallBridge.seekPartitionToBeginning(consumerTmp,
newPartitionState.getKafkaPartitionHandle());
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle())
- 1);
} else if (newPartitionState.getOffset() ==
KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerCallBridge.seekPartitionToEnd(consumerTmp,
newPartitionState.getKafkaPartitionHandle());
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle())
- 1);
} else if (newPartitionState.getOffset() ==
KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the
consumer position
// to the committed group offset, so we do not need to do it.
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle())
- 1);
} else {
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(),
newPartitionState.getOffset() + 1);
}
}
```
> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -------------------------------------------------------------------------
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
> Issue Type: Improvement
> Components: Kafka Connector
> Reporter: Fang Yong
> Assignee: Fang Yong
> Fix For: 1.4.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method
> can only let the job comsume the beginning or the most recent data, but can
> not specify the specific offset of Kafka began to consume.
> So, there should be a configuration item (such as
> "flink.source.start.time" and the format is "yyyy-MM-dd HH:mm:ss") that
> allows user to configure the initial offset of Kafka. The action of
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
> a> offset of partition can be restored from checkpoint/savepoint,
> "flink.source.start.time" will be ignored.
> b> there's no checkpoint/savepoint for the partition (For example, this
> partition is newly increased), the "flink.kafka.start.time" will be used to
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used
> to initialize the offset of the kafka
> a> the "flink.source.start.time" is valid, use it to set the offset of kafka
> b> the "flink.source.start.time" is out-of-range, the same as it does
> currently with no initial offset, get kafka's current offset and start reading
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)