This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new b686501 [ISSUE #95] Fix source can not consume new queue's messages
when topic queue expansion (#96)
b686501 is described below
commit b6865013b7a54883727a2b4cd712087d6e8a6d4d
Author: Humkum <[email protected]>
AuthorDate: Wed Jan 10 17:29:24 2024 +0800
[ISSUE #95] Fix source can not consume new queue's messages when topic
queue expansion (#96)
---
.../connector/rocketmq/legacy/RocketMQSourceFunction.java | 14 +++++++++++++-
.../legacy/sourceFunction/RocketMQSourceFunctionTest.java | 14 +++++++++++++-
2 files changed, 26 insertions(+), 2 deletions(-)
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
index e445643..bedf97f 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
@@ -62,6 +62,7 @@ import org.slf4j.LoggerFactory;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -536,7 +537,7 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
}
}
- public void initOffsetTableFromRestoredOffsets(List<MessageQueue>
messageQueues) {
+ public void initOffsetTableFromRestoredOffsets(List<MessageQueue>
messageQueues) throws MQClientException {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be
null");
restoredOffsets.forEach(
(mq, offset) -> {
@@ -544,6 +545,17 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
offsetTable.put(mq, offset);
}
});
+
+ List<MessageQueue> extMessageQueue = new ArrayList<>();
+ for (MessageQueue messageQueue : messageQueues) {
+ if (!offsetTable.containsKey(messageQueue)) {
+ extMessageQueue.add(messageQueue);
+ }
+ }
+ if (extMessageQueue.size() != 0) {
+ log.info("no restoredOffsets for {}, so init offset for these
queues", extMessageQueue);
+ initOffsets(extMessageQueue);
+ }
log.info("init offset table [{}] from restoredOffsets successful.",
offsetTable);
}
diff --git
a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index cd514cd..08371b3 100644
---
a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++
b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -25,12 +25,16 @@ import
org.apache.flink.connector.rocketmq.legacy.common.config.StartupMode;
import
org.apache.flink.connector.rocketmq.legacy.common.serialization.SimpleStringDeserializationSchema;
import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@@ -69,6 +73,8 @@ public class RocketMQSourceFunctionTest {
@Test
public void testRestartFromCheckpoint() throws Exception {
+ DefaultLitePullConsumer consumer =
Mockito.mock(DefaultLitePullConsumer.class);
+ Mockito.when(consumer.committed(Mockito.any())).thenReturn(40L);
Properties properties = new Properties();
properties.setProperty(RocketMQConfig.CONSUMER_GROUP,
"${ConsumerGroup}");
properties.setProperty(RocketMQConfig.CONSUMER_TOPIC,
"${SourceTopic}");
@@ -82,13 +88,19 @@ public class RocketMQSourceFunctionTest {
map.put(new MessageQueue("tpc", "broker-0", 1), 21L);
map.put(new MessageQueue("tpc", "broker-1", 0), 30L);
map.put(new MessageQueue("tpc", "broker-1", 1), 31L);
+ List<MessageQueue> allocateMessageQueues = new
ArrayList<>(map.keySet());
+ MessageQueue newMessageQueue = new MessageQueue("tpc", "broker-2", 0);
+ allocateMessageQueues.add(newMessageQueue);
+ TestUtils.setFieldValue(source, "messageQueues",
allocateMessageQueues);
+ TestUtils.setFieldValue(source, "consumer", consumer);
TestUtils.setFieldValue(source, "restoredOffsets", map);
TestUtils.setFieldValue(source, "offsetTable", new
ConcurrentHashMap<>());
- source.initOffsetTableFromRestoredOffsets(new
ArrayList<>(map.keySet()));
+ source.initOffsetTableFromRestoredOffsets(allocateMessageQueues);
Map<MessageQueue, Long> offsetTable = (Map)
TestUtils.getFieldValue(source, "offsetTable");
for (Map.Entry<MessageQueue, Long> entry : map.entrySet()) {
assertEquals(offsetTable.containsKey(entry.getKey()), true);
assertEquals(offsetTable.containsValue(entry.getValue()), true);
}
+ assertEquals(offsetTable.containsKey(newMessageQueue), true);
}
}