This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new 1576ae2 [ISSUE #103] Fix seek offset and deserialize bug in newSource
(#104)
1576ae2 is described below
commit 1576ae2d4d51604a464df0fb21074f67c9e50c0f
Author: hejunjie <[email protected]>
AuthorDate: Mon Dec 4 14:34:52 2023 +0800
[ISSUE #103] Fix seek offset and deserialize bug in newSource (#104)
---
pom.xml | 1 +
.../flink/connector/rocketmq/source/InnerConsumerImpl.java | 13 ++++++++++---
.../enumerator/RocketMQSourceEnumStateSerializer.java | 5 ++---
.../source/enumerator/RocketMQSourceEnumerator.java | 11 ++++++-----
.../rocketmq/source/reader/RocketMQSplitReader.java | 3 ---
5 files changed, 19 insertions(+), 14 deletions(-)
diff --git a/pom.xml b/pom.xml
index 67f322d..2eddd1a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,6 +46,7 @@
<junit.version>4.13.2</junit.version>
<junit-jupiter.version>5.9.2</junit-jupiter.version>
<powermock.version>1.7.4</powermock.version>
+ <jaxb-api.version>2.3.1</jaxb-api.version>
</properties>
<dependencies>
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
index edc438b..317031f 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
@@ -17,6 +17,7 @@
package org.apache.flink.connector.rocketmq.source;
+import com.alibaba.fastjson.JSON;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
@@ -25,8 +26,6 @@ import
org.apache.flink.connector.rocketmq.source.reader.MessageViewExt;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;
-
-import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
@@ -240,7 +239,15 @@ public class InnerConsumerImpl implements InnerConsumer {
long offset =
consumer.getOffsetStore()
.readOffset(messageQueue,
ReadOffsetType.READ_FROM_STORE);
- LOG.error(
+
+ if (offset == -1) {
+ offset = adminExt.minOffset(messageQueue);
+ LOG.info(
+ "Consumer seek committed offset from
remote, offset=-1,mq={},use minOffset={}",
+ UtilAll.getQueueDescription(messageQueue),
+ offset);
+ }
+ LOG.info(
"Consumer seek committed offset from remote,
mq={}, offset={}",
UtilAll.getQueueDescription(messageQueue),
offset);
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
index 7589ba4..805df1b 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
@@ -18,9 +18,8 @@
package org.apache.flink.connector.rocketmq.source.enumerator;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
import com.alibaba.fastjson.JSON;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,7 +92,7 @@ public class RocketMQSourceEnumStateSerializer
String topic = in.readUTF();
int queueId = in.readInt();
- MessageQueue queue = new MessageQueue(brokerName, topic,
queueId);
+ MessageQueue queue = new MessageQueue(topic, brokerName,
queueId);
result.add(queue);
}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
index 6103444..77c0c33 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
@@ -18,6 +18,8 @@
package org.apache.flink.connector.rocketmq.source.enumerator;
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Sets;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
@@ -34,15 +36,11 @@ import
org.apache.flink.connector.rocketmq.source.enumerator.allocate.AllocateSt
import
org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.util.FlinkRuntimeException;
-
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.Sets;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -166,6 +164,9 @@ public class RocketMQSourceEnumerator
*/
@Override
public void addSplitsBack(List<RocketMQSourceSplit> splits, int subtaskId)
{
+ SourceSplitChangeResult sourceSplitChangeResult =
+ new SourceSplitChangeResult(new HashSet<>(splits));
+ this.calculateSplitAssignment(sourceSplitChangeResult);
// If the failed subtask has already restarted, we need to assign
splits to it
if (context.registeredReaders().containsKey(subtaskId)) {
sendSplitChangesToRemote(Collections.singleton(subtaskId));
@@ -321,7 +322,7 @@ public class RocketMQSourceEnumerator
}
final Set<RocketMQSourceSplit> pendingAssignmentForReader =
- this.pendingSplitAssignmentMap.get(pendingReader);
+ this.pendingSplitAssignmentMap.remove(pendingReader);
// Put pending assignment into incremental assignment
if (pendingAssignmentForReader != null &&
!pendingAssignmentForReader.isEmpty()) {
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
index b0bca4a..3a07966 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
@@ -36,13 +36,11 @@ import
org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
-
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@@ -190,7 +188,6 @@ public class RocketMQSplitReader<T> implements
SplitReader<MessageView, RocketMQ
public void wakeUp() {
LOG.debug("Wake up the split reader in case the fetcher thread is
blocking in fetch().");
wakeup = true;
- this.consumer.wakeup();
}
@Override