This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git


The following commit(s) were added to refs/heads/main by this push:
     new cc923f1  [Hotfix] RocketMQSourceFunction retains offsets after 
multiple retry times of topic route change (#43)
cc923f1 is described below

commit cc923f11d9db734d2e960615aad0ad605f8fe86e
Author: Nicholas Jiang <[email protected]>
AuthorDate: Wed Jul 27 04:48:35 2022 +0800

    [Hotfix] RocketMQSourceFunction retains offsets after multiple retry times 
of topic route change (#43)
---
 .../flink/legacy/RocketMQSourceFunction.java       | 61 ++++++++++++++--------
 1 file changed, 38 insertions(+), 23 deletions(-)

diff --git 
a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java 
b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
index ac3e4d6..f818eb0 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/RocketMQSourceFunction.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.curator5.com.google.common.collect.Lists;
 import 
org.apache.flink.shaded.curator5.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.commons.collections.map.LinkedMap;
@@ -243,7 +244,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                                             try {
                                                 long offset = 
getMessageQueueOffset(mq);
 
-                                                PullResult pullResult = null;
+                                                PullResult pullResult;
                                                 if (StringUtils.isEmpty(sql)) {
                                                     pullResult =
                                                             
consumer.pullBlockIfNotFound(
@@ -449,7 +450,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
                         offsetTable.put(mq, offset);
                     }
                 });
-        log.info("init offset table from restoredOffsets successful.", 
offsetTable);
+        log.info("init offset table [{}] from restoredOffsets successful.", 
offsetTable);
     }
 
     @Override
@@ -461,33 +462,47 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
             return;
         }
 
-        // Discovery topic Route change when snapshot
-        RetryUtil.call(
-                () -> {
-                    Collection<MessageQueue> totalQueues =
-                            consumer.fetchSubscribeMessageQueues(topic);
-                    int taskNumber = 
getRuntimeContext().getNumberOfParallelSubtasks();
-                    int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
-                    List<MessageQueue> newQueues =
-                            RocketMQUtils.allocate(totalQueues, taskNumber, 
taskIndex);
-                    Collections.sort(newQueues);
-                    log.debug(taskIndex + " Topic route is same.");
-                    if (!messageQueues.equals(newQueues)) {
-                        throw new RuntimeException();
-                    }
-                    return true;
-                },
-                "RuntimeException due to topic route changed");
+        Map<MessageQueue, Long> currentOffsets;
+        try {
+            // Discovers topic route change when snapshot
+            RetryUtil.call(
+                    () -> {
+                        Collection<MessageQueue> totalQueues =
+                                consumer.fetchSubscribeMessageQueues(topic);
+                        int taskNumber = 
getRuntimeContext().getNumberOfParallelSubtasks();
+                        int taskIndex = 
getRuntimeContext().getIndexOfThisSubtask();
+                        List<MessageQueue> newQueues =
+                                RocketMQUtils.allocate(totalQueues, 
taskNumber, taskIndex);
+                        Collections.sort(newQueues);
+                        log.debug(taskIndex + " Topic route is same.");
+                        if (!messageQueues.equals(newQueues)) {
+                            throw new RuntimeException();
+                        }
+                        return true;
+                    },
+                    "RuntimeException due to topic route changed");
+
+            unionOffsetStates.clear();
+            currentOffsets = new HashMap<>(offsetTable.size());
+        } catch (RuntimeException e) {
+            log.warn("Retry failed multiple times for topic route change, keep 
previous offset.");
+            // If the retry fails for multiple times, the message queue and 
its offset in the
+            // previous checkpoint will be retained.
+            List<Tuple2<MessageQueue, Long>> unionOffsets =
+                    Lists.newArrayList(unionOffsetStates.get().iterator());
+            Map<MessageQueue, Long> queueOffsets = new 
HashMap<>(unionOffsets.size());
+            unionOffsets.forEach(queueOffset -> 
queueOffsets.put(queueOffset.f0, queueOffset.f1));
+            currentOffsets = new HashMap<>(unionOffsets.size() + 
offsetTable.size());
+            currentOffsets.putAll(queueOffsets);
+        }
 
-        unionOffsetStates.clear();
-        HashMap<MessageQueue, Long> currentOffsets = new 
HashMap<>(offsetTable.size());
         for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
             unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
             currentOffsets.put(entry.getKey(), entry.getValue());
         }
         pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
         log.info(
-                "Snapshotted state, last processed offsets: {}, checkpoint id: 
{}, timestamp: {}",
+                "Snapshot state, last processed offsets: {}, checkpoint id: 
{}, timestamp: {}",
                 offsetTable,
                 context.getCheckpointId(),
                 context.getCheckpointTimestamp());
@@ -531,7 +546,7 @@ public class RocketMQSourceFunction<OUT> extends 
RichParallelSourceFunction<OUT>
     }
 
     @Override
-    public TypeInformation getProducedType() {
+    public TypeInformation<OUT> getProducedType() {
         return schema.getProducedType();
     }
 

Reply via email to