This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new cc923f1 [Hotfix] RocketMQSourceFunction retains offsets after
multiple retry times of topic route change (#43)
cc923f1 is described below
commit cc923f11d9db734d2e960615aad0ad605f8fe86e
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Jul 27 04:48:35 2022 +0800
[Hotfix] RocketMQSourceFunction retains offsets after multiple retry times
of topic route change (#43)
---
.../flink/legacy/RocketMQSourceFunction.java | 61 ++++++++++++++--------
1 file changed, 38 insertions(+), 23 deletions(-)
diff --git
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index ac3e4d6..f818eb0 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -50,6 +50,7 @@ import
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.util.Preconditions;
+import org.apache.flink.shaded.curator5.com.google.common.collect.Lists;
import
org.apache.flink.shaded.curator5.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.collections.map.LinkedMap;
@@ -243,7 +244,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
try {
long offset =
getMessageQueueOffset(mq);
- PullResult pullResult = null;
+ PullResult pullResult;
if (StringUtils.isEmpty(sql)) {
pullResult =
consumer.pullBlockIfNotFound(
@@ -449,7 +450,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
offsetTable.put(mq, offset);
}
});
- log.info("init offset table from restoredOffsets successful.",
offsetTable);
+ log.info("init offset table [{}] from restoredOffsets successful.",
offsetTable);
}
@Override
@@ -461,33 +462,47 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
return;
}
- // Discovery topic Route change when snapshot
- RetryUtil.call(
- () -> {
- Collection<MessageQueue> totalQueues =
- consumer.fetchSubscribeMessageQueues(topic);
- int taskNumber =
getRuntimeContext().getNumberOfParallelSubtasks();
- int taskIndex =
getRuntimeContext().getIndexOfThisSubtask();
- List<MessageQueue> newQueues =
- RocketMQUtils.allocate(totalQueues, taskNumber,
taskIndex);
- Collections.sort(newQueues);
- log.debug(taskIndex + " Topic route is same.");
- if (!messageQueues.equals(newQueues)) {
- throw new RuntimeException();
- }
- return true;
- },
- "RuntimeException due to topic route changed");
+ Map<MessageQueue, Long> currentOffsets;
+ try {
+ // Discovers topic route change when snapshot
+ RetryUtil.call(
+ () -> {
+ Collection<MessageQueue> totalQueues =
+ consumer.fetchSubscribeMessageQueues(topic);
+ int taskNumber =
getRuntimeContext().getNumberOfParallelSubtasks();
+ int taskIndex =
getRuntimeContext().getIndexOfThisSubtask();
+ List<MessageQueue> newQueues =
+ RocketMQUtils.allocate(totalQueues,
taskNumber, taskIndex);
+ Collections.sort(newQueues);
+ log.debug(taskIndex + " Topic route is same.");
+ if (!messageQueues.equals(newQueues)) {
+ throw new RuntimeException();
+ }
+ return true;
+ },
+ "RuntimeException due to topic route changed");
+
+ unionOffsetStates.clear();
+ currentOffsets = new HashMap<>(offsetTable.size());
+ } catch (RuntimeException e) {
+ log.warn("Retry failed multiple times for topic route change, keep
previous offset.");
+ // If the retry fails for multiple times, the message queue and
its offset in the
+ // previous checkpoint will be retained.
+ List<Tuple2<MessageQueue, Long>> unionOffsets =
+ Lists.newArrayList(unionOffsetStates.get().iterator());
+ Map<MessageQueue, Long> queueOffsets = new
HashMap<>(unionOffsets.size());
+ unionOffsets.forEach(queueOffset ->
queueOffsets.put(queueOffset.f0, queueOffset.f1));
+ currentOffsets = new HashMap<>(unionOffsets.size() +
offsetTable.size());
+ currentOffsets.putAll(queueOffsets);
+ }
- unionOffsetStates.clear();
- HashMap<MessageQueue, Long> currentOffsets = new
HashMap<>(offsetTable.size());
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
currentOffsets.put(entry.getKey(), entry.getValue());
}
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
log.info(
- "Snapshotted state, last processed offsets: {}, checkpoint id:
{}, timestamp: {}",
+ "Snapshot state, last processed offsets: {}, checkpoint id:
{}, timestamp: {}",
offsetTable,
context.getCheckpointId(),
context.getCheckpointTimestamp());
@@ -531,7 +546,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
}
@Override
- public TypeInformation getProducedType() {
+ public TypeInformation<OUT> getProducedType() {
return schema.getProducedType();
}