This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
The following commit(s) were added to refs/heads/main by this push:
new fce246b [RIP-74] Implement dynamic load balancing in
Flink-Connector-RocketMQ (#126)
fce246b is described below
commit fce246bda5d2a17edd6d793f3b6cb54deb18872d
Author: hqbfz <[email protected]>
AuthorDate: Thu Mar 6 17:18:02 2025 +0800
[RIP-74] Implement dynamic load balancing in Flink-Connector-RocketMQ (#126)
---
.../rocketmq/common/event/SourceCheckEvent.java} | 27 +-
.../rocketmq/common/event/SourceDetectEvent.java} | 25 +-
.../common/event/SourceInitAssignEvent.java} | 25 +-
.../common/event/SourceReportOffsetEvent.java | 60 ++++
.../connector/rocketmq/common/lock/SpinLock.java | 35 ++
.../rocketmq/legacy/RocketMQSourceFunction.java | 3 +-
.../rocketmq/source/InnerConsumerImpl.java | 3 +-
.../connector/rocketmq/source/RocketMQSource.java | 18 +-
.../RocketMQSourceEnumStateSerializer.java | 3 +-
.../enumerator/RocketMQSourceEnumerator.java | 355 +++++++++++++++++++--
.../enumerator/allocate/AllocateStrategy.java | 13 +
.../allocate/AllocateStrategyFactory.java | 3 +
.../allocate/AverageAllocateStrategy.java | 43 +++
.../allocate/BroadcastAllocateStrategy.java | 6 +
.../allocate/ConsistentHashAllocateStrategy.java | 6 +
.../metrics/RocketMQSourceReaderMetrics.java | 2 +
.../reader/RocketMQSourceFetcherManager.java | 4 +
.../source/reader/RocketMQSourceReader.java | 99 +++++-
.../source/reader/RocketMQSplitReader.java | 101 ++++--
.../split/RocketMQPartitionSplitSerializer.java | 8 +-
.../rocketmq/source/split/RocketMQSourceSplit.java | 63 +++-
.../source/split/RocketMQSourceSplitState.java | 7 +-
.../sourceFunction/RocketMQSourceFunctionTest.java | 1 -
.../allocate/AverageAllocateStrategyTest.java | 97 ++++++
.../RocketMQPartitionSplitSerializerTest.java | 52 +++
25 files changed, 952 insertions(+), 107 deletions(-)
diff --git
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java
similarity index 52%
copy from
src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
copy to
src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java
index f1fa5af..70a7206 100644
---
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceCheckEvent.java
@@ -16,24 +16,23 @@
* limitations under the License.
*/
-package org.apache.flink.connector.rocketmq.source.split;
+package org.apache.flink.connector.rocketmq.common.event;
-import org.junit.Test;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.java.tuple.Tuple2;
-import java.io.IOException;
+import org.apache.rocketmq.common.message.MessageQueue;
-import static org.junit.Assert.assertEquals;
+import java.util.Map;
-/** Test for {@link RocketMQPartitionSplitSerializer}. */
-public class RocketMQPartitionSplitSerializerTest {
+public class SourceCheckEvent implements SourceEvent {
+ private Map<MessageQueue, Tuple2<Long, Long>> assignedMq;
- @Test
- public void testSerializePartitionSplit() throws IOException {
- RocketMQPartitionSplitSerializer serializer = new
RocketMQPartitionSplitSerializer();
- RocketMQSourceSplit expected =
- new RocketMQSourceSplit("test-split-serialization",
"taobaodaily", 256, 100, 300);
- RocketMQSourceSplit actual =
- serializer.deserialize(serializer.getVersion(),
serializer.serialize(expected));
- assertEquals(expected, actual);
+ public Map<MessageQueue, Tuple2<Long, Long>> getAssignedMq() {
+ return assignedMq;
+ }
+
+ public void setAssignedMq(Map<MessageQueue, Tuple2<Long, Long>>
assignedMq) {
+ this.assignedMq = assignedMq;
}
}
diff --git
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java
similarity index 52%
copy from
src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
copy to
src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java
index f1fa5af..af8c4e3 100644
---
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceDetectEvent.java
@@ -16,24 +16,19 @@
* limitations under the License.
*/
-package org.apache.flink.connector.rocketmq.source.split;
+package org.apache.flink.connector.rocketmq.common.event;
-import org.junit.Test;
+import org.apache.flink.api.connector.source.SourceEvent;
-import java.io.IOException;
+public class SourceDetectEvent implements SourceEvent {
+ // Request to resend the initial allocation result
+ private boolean reSendInitAssign = true;
-import static org.junit.Assert.assertEquals;
-
-/** Test for {@link RocketMQPartitionSplitSerializer}. */
-public class RocketMQPartitionSplitSerializerTest {
+ public boolean getReSendInitAssign() {
+ return reSendInitAssign;
+ }
- @Test
- public void testSerializePartitionSplit() throws IOException {
- RocketMQPartitionSplitSerializer serializer = new
RocketMQPartitionSplitSerializer();
- RocketMQSourceSplit expected =
- new RocketMQSourceSplit("test-split-serialization",
"taobaodaily", 256, 100, 300);
- RocketMQSourceSplit actual =
- serializer.deserialize(serializer.getVersion(),
serializer.serialize(expected));
- assertEquals(expected, actual);
+ public void setReSendInitAssign(boolean reSendInitAssign) {
+ this.reSendInitAssign = reSendInitAssign;
}
}
diff --git
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java
similarity index 52%
copy from
src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
copy to
src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java
index f1fa5af..347ec51 100644
---
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceInitAssignEvent.java
@@ -16,24 +16,21 @@
* limitations under the License.
*/
-package org.apache.flink.connector.rocketmq.source.split;
+package org.apache.flink.connector.rocketmq.common.event;
-import org.junit.Test;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
-import java.io.IOException;
+import java.util.List;
-import static org.junit.Assert.assertEquals;
+public class SourceInitAssignEvent implements SourceEvent {
+ private List<RocketMQSourceSplit> splits;
-/** Test for {@link RocketMQPartitionSplitSerializer}. */
-public class RocketMQPartitionSplitSerializerTest {
+ public void setSplits(List<RocketMQSourceSplit> splits) {
+ this.splits = splits;
+ }
- @Test
- public void testSerializePartitionSplit() throws IOException {
- RocketMQPartitionSplitSerializer serializer = new
RocketMQPartitionSplitSerializer();
- RocketMQSourceSplit expected =
- new RocketMQSourceSplit("test-split-serialization",
"taobaodaily", 256, 100, 300);
- RocketMQSourceSplit actual =
- serializer.deserialize(serializer.getVersion(),
serializer.serialize(expected));
- assertEquals(expected, actual);
+ public List<RocketMQSourceSplit> getSplits() {
+ return splits;
}
}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java
new file mode 100644
index 0000000..d850256
--- /dev/null
+++
b/src/main/java/org/apache/flink/connector/rocketmq/common/event/SourceReportOffsetEvent.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rocketmq.common.event;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+
+public class SourceReportOffsetEvent implements SourceEvent {
+ private String topic;
+ private String broker;
+ private int queueId;
+ private long checkpoint = -1;
+
+ public void setBroker(String broker) {
+ this.broker = broker;
+ }
+
+ public void setCheckpoint(long checkpoint) {
+ this.checkpoint = checkpoint;
+ }
+
+ public void setQueueId(int queueId) {
+ this.queueId = queueId;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public long getCheckpoint() {
+ return checkpoint;
+ }
+
+ public int getQueueId() {
+ return queueId;
+ }
+
+ public String getBroker() {
+ return broker;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java
b/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java
new file mode 100644
index 0000000..dbb32c2
--- /dev/null
+++
b/src/main/java/org/apache/flink/connector/rocketmq/common/lock/SpinLock.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.rocketmq.common.lock;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class SpinLock {
+ private AtomicBoolean lock = new AtomicBoolean(false);
+
+ public void lock() {
+ boolean lock = false;
+ do {
+ lock = this.lock.compareAndSet(false, true);
+ } while (!lock);
+ }
+
+ public void unlock() {
+ this.lock.set(false);
+ }
+}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
index bedf97f..91d1d39 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java
@@ -537,7 +537,8 @@ public class RocketMQSourceFunction<OUT> extends
RichParallelSourceFunction<OUT>
}
}
- public void initOffsetTableFromRestoredOffsets(List<MessageQueue>
messageQueues) throws MQClientException {
+ public void initOffsetTableFromRestoredOffsets(List<MessageQueue>
messageQueues)
+ throws MQClientException {
Preconditions.checkNotNull(restoredOffsets, "restoredOffsets can't be
null");
restoredOffsets.forEach(
(mq, offset) -> {
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
index 317031f..a76a198 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/InnerConsumerImpl.java
@@ -17,7 +17,6 @@
package org.apache.flink.connector.rocketmq.source;
-import com.alibaba.fastjson.JSON;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
@@ -26,6 +25,8 @@ import
org.apache.flink.connector.rocketmq.source.reader.MessageViewExt;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.StringUtils;
+
+import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
index 25fd8c7..cfdc0a1 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/RocketMQSource.java
@@ -124,13 +124,14 @@ public class RocketMQSource<OUT>
final RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics =
new RocketMQSourceReaderMetrics(readerContext.metricGroup());
- Supplier<SplitReader<MessageView, RocketMQSourceSplit>>
splitReaderSupplier =
- () ->
- new RocketMQSplitReader<>(
- configuration,
- readerContext,
- deserializationSchema,
- rocketMQSourceReaderMetrics);
+ // unique reader
+ RocketMQSplitReader<OUT> reader =
+ new RocketMQSplitReader<>(
+ configuration,
+ readerContext,
+ deserializationSchema,
+ rocketMQSourceReaderMetrics);
+ Supplier<SplitReader<MessageView, RocketMQSourceSplit>>
splitReaderSupplier = () -> reader;
RocketMQSourceFetcherManager rocketmqSourceFetcherManager =
new RocketMQSourceFetcherManager(
@@ -145,7 +146,8 @@ public class RocketMQSource<OUT>
recordEmitter,
configuration,
readerContext,
- rocketMQSourceReaderMetrics);
+ rocketMQSourceReaderMetrics,
+ splitReaderSupplier);
}
@Override
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
index 805df1b..960e622 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumStateSerializer.java
@@ -18,8 +18,9 @@
package org.apache.flink.connector.rocketmq.source.enumerator;
-import com.alibaba.fastjson.JSON;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
index 77c0c33..c228309 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/RocketMQSourceEnumerator.java
@@ -18,16 +18,21 @@
package org.apache.flink.connector.rocketmq.source.enumerator;
-import com.alibaba.fastjson.JSON;
-import com.google.common.collect.Sets;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.rocketmq.common.event.SourceCheckEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceDetectEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent;
+import
org.apache.flink.connector.rocketmq.common.event.SourceReportOffsetEvent;
+import org.apache.flink.connector.rocketmq.common.lock.SpinLock;
import org.apache.flink.connector.rocketmq.source.InnerConsumer;
import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl;
import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
@@ -36,18 +41,26 @@ import
org.apache.flink.connector.rocketmq.source.enumerator.allocate.AllocateSt
import
org.apache.flink.connector.rocketmq.source.enumerator.offset.OffsetsSelector;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.util.FlinkRuntimeException;
+
+import com.alibaba.fastjson.JSON;
+import com.google.common.collect.Sets;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -69,18 +82,31 @@ public class RocketMQSourceEnumerator
private final OffsetsSelector startingOffsetsSelector;
private final OffsetsSelector stoppingOffsetsSelector;
+ // Used for queue dynamic allocation
+ private final Map<MessageQueue, Long> checkedOffsets;
+ private boolean[] initTask;
+
// The internal states of the enumerator.
// This set is only accessed by the partition discovery callable in the
callAsync() method.
// The current assignment by reader id. Only accessed by the coordinator
thread.
// The discovered and initialized partition splits that are waiting for
owner reader to be
// ready.
- private final Set<MessageQueue> allocatedSet;
+ private final Map<MessageQueue, Byte> allocatedSet;
private final Map<Integer, Set<RocketMQSourceSplit>>
pendingSplitAssignmentMap;
+ // Only Maintaining mapping relationship
+ private final Map<Integer, Set<RocketMQSourceSplit>> assignedMap;
+ private final Map<MessageQueue, Integer /* taskId */>
reflectedQueueToTaskId;
+
// Param from configuration
private final String groupId;
private final long partitionDiscoveryIntervalMs;
+ // Indicates the number of allocated queues
+ private int partitionId;
+ private final SpinLock lock;
+ private ScheduledExecutorService scheduledExecutorService;
+
public RocketMQSourceEnumerator(
OffsetsSelector startingOffsetsSelector,
OffsetsSelector stoppingOffsetsSelector,
@@ -107,13 +133,19 @@ public class RocketMQSourceEnumerator
this.configuration = configuration;
this.context = context;
this.boundedness = boundedness;
+ this.lock = new SpinLock();
// Support allocate splits to reader
+ this.checkedOffsets = new ConcurrentHashMap<>();
+ this.reflectedQueueToTaskId = new ConcurrentHashMap<>();
this.pendingSplitAssignmentMap = new ConcurrentHashMap<>();
- this.allocatedSet = new HashSet<>(currentSplitAssignment);
+ this.allocatedSet = new ConcurrentHashMap<>();
+ this.assignedMap = new ConcurrentHashMap<>();
this.allocateStrategy =
AllocateStrategyFactory.getStrategy(
- configuration, context, new
RocketMQSourceEnumState(allocatedSet));
+ configuration,
+ context,
+ new RocketMQSourceEnumState(currentSplitAssignment));
// For rocketmq setting
this.groupId =
configuration.getString(RocketMQSourceOptions.CONSUMER_GROUP);
@@ -121,12 +153,23 @@ public class RocketMQSourceEnumerator
this.stoppingOffsetsSelector = stoppingOffsetsSelector;
this.partitionDiscoveryIntervalMs =
configuration.getLong(RocketMQSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS);
+ this.scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+
+ // Initialize the task status
+ log.info(
+ "Starting the RocketMQSourceEnumerator with current split
assignment: {}",
+ currentSplitAssignment);
+ if (!currentSplitAssignment.isEmpty()) {
+ this.initTask = new boolean[context.currentParallelism()];
+ }
}
@Override
public void start() {
consumer = new InnerConsumerImpl(configuration);
consumer.start();
+ scheduledExecutorService.scheduleAtFixedRate(
+ this::notifyAssignResult, 30 * 1000, 30 * 1000,
TimeUnit.MILLISECONDS);
if (partitionDiscoveryIntervalMs > 0) {
log.info(
@@ -190,7 +233,7 @@ public class RocketMQSourceEnumerator
@Override
public RocketMQSourceEnumState snapshotState(long checkpointId) {
- return new RocketMQSourceEnumState(allocatedSet);
+ return new RocketMQSourceEnumState(allocatedSet.keySet());
}
@Override
@@ -205,9 +248,32 @@ public class RocketMQSourceEnumerator
}
}
+ @Override
+ public void handleSourceEvent(int taskId, SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SourceReportOffsetEvent) {
+ handleOffsetEvent(taskId, (SourceReportOffsetEvent) sourceEvent);
+ } else if (sourceEvent instanceof SourceInitAssignEvent) {
+ handleInitAssignEvent(taskId, (SourceInitAssignEvent) sourceEvent);
+ }
+ }
+
// ----------------- private methods -------------------
private Set<MessageQueue> requestServiceDiscovery() {
+ // Ensure all subtasks have been initialized
+ try {
+ if (initTask != null) {
+ for (int i = 0; i < context.currentParallelism(); i++) {
+ if (!initTask[i]) {
+ context.sendEventToSourceReader(i, new
SourceDetectEvent());
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("init request resend error, please check task has
started");
+ return null;
+ }
+
Set<String> topicSet =
Sets.newHashSet(
configuration
@@ -235,6 +301,9 @@ public class RocketMQSourceEnumerator
if (t != null) {
throw new FlinkRuntimeException("Failed to handle source splits
change due to ", t);
}
+ if (latestSet == null) {
+ return;
+ }
final SourceChangeResult sourceChangeResult =
getSourceChangeResult(latestSet);
if (sourceChangeResult.isEmpty()) {
@@ -248,30 +317,59 @@ public class RocketMQSourceEnumerator
// This method should only be invoked in the coordinator executor thread.
private SourceSplitChangeResult initializeSourceSplits(SourceChangeResult
sourceChangeResult) {
+ lock.lock();
+
Set<MessageQueue> increaseSet = sourceChangeResult.getIncreaseSet();
+ Set<MessageQueue> decreaseSet = sourceChangeResult.getDecreaseSet();
OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
new InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);
- Map<MessageQueue, Long> startingOffsets =
+ Map<MessageQueue, Long> increaseStartingOffsets =
startingOffsetsSelector.getMessageQueueOffsets(increaseSet,
offsetsRetriever);
- Map<MessageQueue, Long> stoppingOffsets =
+ Map<MessageQueue, Long> increaseStoppingOffsets =
stoppingOffsetsSelector.getMessageQueueOffsets(increaseSet,
offsetsRetriever);
+ Map<MessageQueue, Long> decreaseStoppingOffsets =
+ stoppingOffsetsSelector.getMessageQueueOffsets(decreaseSet,
offsetsRetriever);
+ Map<MessageQueue, Long> decreaseStartingOffsets =
+ startingOffsetsSelector.getMessageQueueOffsets(decreaseSet,
offsetsRetriever);
Set<RocketMQSourceSplit> increaseSplitSet =
increaseSet.stream()
.map(
mq -> {
- long startingOffset =
startingOffsets.get(mq);
+ long startingOffset =
increaseStartingOffsets.get(mq);
long stoppingOffset =
- stoppingOffsets.getOrDefault(
+
increaseStoppingOffsets.getOrDefault(
mq,
RocketMQSourceSplit.NO_STOPPING_OFFSET);
return new RocketMQSourceSplit(
mq, startingOffset,
stoppingOffset);
})
.collect(Collectors.toSet());
+ // Update cache
+ increaseSet.forEach(
+ mq ->
+ checkedOffsets.put(
+ mq,
+ increaseStartingOffsets.getOrDefault(
+ mq,
RocketMQSourceSplit.NO_STOPPING_OFFSET)));
+
+ Set<RocketMQSourceSplit> decreaseSplitSet =
+ decreaseSet.stream()
+ .map(
+ mq -> {
+ long startingOffset =
decreaseStartingOffsets.get(mq);
+ long stoppingOffset =
+
decreaseStoppingOffsets.getOrDefault(
+ mq,
RocketMQSourceSplit.NO_STOPPING_OFFSET);
+ allocatedSet.remove(mq);
+ checkedOffsets.remove(mq);
+ return new RocketMQSourceSplit(
+ mq, startingOffset,
stoppingOffset, false);
+ })
+ .collect(Collectors.toSet());
- return new SourceSplitChangeResult(increaseSplitSet,
sourceChangeResult.getDecreaseSet());
+ return new SourceSplitChangeResult(increaseSplitSet, decreaseSplitSet);
}
/**
@@ -291,15 +389,145 @@ public class RocketMQSourceEnumerator
if (partitionDiscoveryIntervalMs <= 0) {
log.info("Split changes, but dynamic partition discovery is
disabled.");
}
- this.calculateSplitAssignment(sourceSplitChangeResult);
- this.sendSplitChangesToRemote(context.registeredReaders().keySet());
+ try {
+ this.calculateSplitAssignment(sourceSplitChangeResult);
+
this.sendSplitChangesToRemote(context.registeredReaders().keySet());
+ } finally {
+ lock.unlock();
+ }
}
/** Calculate new split assignment according allocate strategy */
private void calculateSplitAssignment(SourceSplitChangeResult
sourceSplitChangeResult) {
- Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap =
- this.allocateStrategy.allocate(
- sourceSplitChangeResult.getIncreaseSet(),
context.currentParallelism());
+ Map<Integer, Set<RocketMQSourceSplit>> newSourceSplitAllocateMap;
+
+ // Preliminary calculation of distribution results
+ {
+ // Allocate valid queues
+ if (sourceSplitChangeResult.decreaseSet != null
+ && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
+ partitionId = 0;
+
+ // Re-load balancing
+ Set<RocketMQSourceSplit> allMQ = new HashSet<>();
+ OffsetsSelector.MessageQueueOffsetsRetriever offsetsRetriever =
+ new
InnerConsumerImpl.RemotingOffsetsRetrieverImpl(consumer);
+ Map<MessageQueue, Long> stoppingOffsets =
+ stoppingOffsetsSelector.getMessageQueueOffsets(
+ allocatedSet.keySet(), offsetsRetriever);
+ Set<MessageQueue> delete =
+ sourceSplitChangeResult.decreaseSet.stream()
+ .map(RocketMQSourceSplit::getMessageQueue)
+ .collect(Collectors.toSet());
+
+ // Calculate all queue
+ allMQ.addAll(sourceSplitChangeResult.increaseSet);
+ allocatedSet
+ .keySet()
+ .forEach(
+ mq -> {
+ if (!delete.contains(mq)) {
+ allMQ.add(
+ new RocketMQSourceSplit(
+ mq,
+ checkedOffsets.get(mq),
+
stoppingOffsets.getOrDefault(
+ mq,
+
RocketMQSourceSplit
+
.NO_STOPPING_OFFSET)));
+ }
+ });
+ newSourceSplitAllocateMap =
+ this.allocateStrategy.allocate(
+ allMQ, context.currentParallelism(),
partitionId);
+
+ // Update cache
+ assignedMap.clear();
+ for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
+ assignedMap.put(
+ (Integer) entry.getKey(),
+ ((Set<RocketMQSourceSplit>) entry.getValue())
+ .stream()
+ .map(RocketMQSourceSplit::clone)
+ .collect(Collectors.toSet()));
+ }
+ partitionId = allMQ.size();
+ } else {
+ newSourceSplitAllocateMap =
+ this.allocateStrategy.allocate(
+ sourceSplitChangeResult.getIncreaseSet(),
+ context.currentParallelism(),
+ partitionId);
+
+ // Update cache
+ newSourceSplitAllocateMap.forEach(
+ (k, v) ->
+ v.forEach(
+ mq ->
+ assignedMap
+ .computeIfAbsent(k, r
-> new HashSet<>())
+ .add(mq)));
+ partitionId += sourceSplitChangeResult.getIncreaseSet().size();
+ }
+
+ // Allocate deleted queues
+ if (sourceSplitChangeResult.decreaseSet != null
+ && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
+ sourceSplitChangeResult.decreaseSet.forEach(
+ mq -> {
+ newSourceSplitAllocateMap
+ .computeIfAbsent(
+
reflectedQueueToTaskId.get(mq.getMessageQueue()),
+ k -> new HashSet<>())
+ .add(mq);
+
reflectedQueueToTaskId.remove(mq.getMessageQueue());
+ });
+ }
+ }
+
+ {
+ // Calculate the result after queue migration
+ if (sourceSplitChangeResult.decreaseSet != null
+ && !sourceSplitChangeResult.decreaseSet.isEmpty()) {
+ Map<Integer, Set<RocketMQSourceSplit>> migrationQueue = new
HashMap<>();
+ Map<Integer, Set<RocketMQSourceSplit>> noMigrationQueue = new
HashMap<>();
+ for (Map.Entry entry : newSourceSplitAllocateMap.entrySet()) {
+ int taskId = (int) entry.getKey();
+ Set<RocketMQSourceSplit> splits =
(Set<RocketMQSourceSplit>) entry.getValue();
+ for (RocketMQSourceSplit split : splits) {
+ if (!split.getIsIncrease()) {
+ continue;
+ }
+ if (taskId !=
reflectedQueueToTaskId.get(split.getMessageQueue())) {
+ migrationQueue
+ .computeIfAbsent(
+
reflectedQueueToTaskId.get(split.getMessageQueue()),
+ k -> new HashSet<>())
+ .add(
+ new RocketMQSourceSplit(
+ split.getMessageQueue(),
+ split.getStartingOffset(),
+ split.getStoppingOffset(),
+ false));
+ } else {
+ noMigrationQueue
+ .computeIfAbsent(taskId, k -> new
HashSet<>())
+ .add(split);
+ }
+ }
+ }
+
+ // finally result
+ migrationQueue.forEach(
+ (taskId, splits) -> {
+
newSourceSplitAllocateMap.get(taskId).addAll(splits);
+ });
+ noMigrationQueue.forEach(
+ (taskId, splits) -> {
+
newSourceSplitAllocateMap.get(taskId).removeAll(splits);
+ });
+ }
+ }
for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry :
newSourceSplitAllocateMap.entrySet()) {
@@ -330,7 +558,12 @@ public class RocketMQSourceEnumerator
.computeIfAbsent(pendingReader, k -> new ArrayList<>())
.addAll(pendingAssignmentForReader);
pendingAssignmentForReader.forEach(
- split ->
this.allocatedSet.add(split.getMessageQueue()));
+ split -> {
+ if (split.getIsIncrease()) {
+ this.allocatedSet.put(split.getMessageQueue(),
(byte) 1);
+
reflectedQueueToTaskId.put(split.getMessageQueue(), pendingReader);
+ }
+ });
}
}
@@ -352,6 +585,86 @@ public class RocketMQSourceEnumerator
}
}
+ private void handleInitAssignEvent(int taskId, SourceInitAssignEvent
initAssignEvent) {
+ if (this.initTask == null || this.initTask[taskId]) {
+ return;
+ }
+ lock.lock();
+ try {
+ // sync assign result
+ if (initAssignEvent.getSplits() != null &&
!initAssignEvent.getSplits().isEmpty()) {
+ log.info(
+ "Received SourceInitAssignEvent from reader {} with {}
splits.",
+ taskId,
+ initAssignEvent.getSplits().toString());
+ initAssignEvent
+ .getSplits()
+ .forEach(
+ split -> {
+ this.assignedMap
+ .computeIfAbsent(taskId, r -> new
HashSet<>())
+ .add(split);
+ this.checkedOffsets.put(
+ split.getMessageQueue(),
split.getStoppingOffset());
+ this.reflectedQueueToTaskId.put(
+ split.getMessageQueue(), taskId);
+
this.allocatedSet.put(split.getMessageQueue(), (byte) 1);
+ });
+ }
+ this.initTask[taskId] = true;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void handleOffsetEvent(int taskId, SourceReportOffsetEvent
sourceReportOffsetEvent) {
+ lock.lock();
+ try {
+ // Update offset of message queue
+ if (sourceReportOffsetEvent != null &&
sourceReportOffsetEvent.getCheckpoint() != -1) {
+ log.info(
+ "Received SourceReportOffsetEvent from reader {} with
offset {}",
+ taskId,
+ sourceReportOffsetEvent.getCheckpoint());
+ MessageQueue mq =
+ new MessageQueue(
+ sourceReportOffsetEvent.getTopic(),
+ sourceReportOffsetEvent.getBroker(),
+ sourceReportOffsetEvent.getQueueId());
+ this.checkedOffsets.put(mq,
sourceReportOffsetEvent.getCheckpoint());
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void notifyAssignResult() {
+ if (assignedMap.isEmpty()) {
+ return;
+ }
+ lock.lock();
+ try {
+ for (Map.Entry<Integer, Set<RocketMQSourceSplit>> entry :
assignedMap.entrySet()) {
+ SourceCheckEvent sourceCheckEvent = new SourceCheckEvent();
+ Map<MessageQueue, Tuple2<Long, Long>> assignedMq = new
HashMap<>();
+
+ entry.getValue()
+ .forEach(
+ split -> {
+ assignedMq.put(
+ split.getMessageQueue(),
+ new Tuple2<>(
+ split.getStartingOffset(),
+
split.getStoppingOffset()));
+ });
+ sourceCheckEvent.setAssignedMq(assignedMq);
+ context.sendEventToSourceReader(entry.getKey(),
sourceCheckEvent);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
/** A container class to hold the newly added partitions and removed
partitions. */
@VisibleForTesting
private static class SourceChangeResult {
@@ -380,7 +693,7 @@ public class RocketMQSourceEnumerator
public static class SourceSplitChangeResult {
private final Set<RocketMQSourceSplit> increaseSet;
- private final Set<MessageQueue> decreaseSet;
+ private final Set<RocketMQSourceSplit> decreaseSet;
private SourceSplitChangeResult(Set<RocketMQSourceSplit> increaseSet) {
this.increaseSet = Collections.unmodifiableSet(increaseSet);
@@ -388,7 +701,7 @@ public class RocketMQSourceEnumerator
}
private SourceSplitChangeResult(
- Set<RocketMQSourceSplit> increaseSet, Set<MessageQueue>
decreaseSet) {
+ Set<RocketMQSourceSplit> increaseSet, Set<RocketMQSourceSplit>
decreaseSet) {
this.increaseSet = Collections.unmodifiableSet(increaseSet);
this.decreaseSet = Collections.unmodifiableSet(decreaseSet);
}
@@ -397,14 +710,14 @@ public class RocketMQSourceEnumerator
return increaseSet;
}
- public Set<MessageQueue> getDecreaseSet() {
+ public Set<RocketMQSourceSplit> getDecreaseSet() {
return decreaseSet;
}
}
@VisibleForTesting
private SourceChangeResult getSourceChangeResult(Set<MessageQueue>
latestSet) {
- Set<MessageQueue> currentSet =
Collections.unmodifiableSet(this.allocatedSet);
+ Set<MessageQueue> currentSet =
Collections.unmodifiableSet(this.allocatedSet.keySet());
Set<MessageQueue> increaseSet = Sets.difference(latestSet, currentSet);
Set<MessageQueue> decreaseSet = Sets.difference(currentSet, latestSet);
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
index bfd814e..6386b22 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategy.java
@@ -44,4 +44,17 @@ public interface AllocateStrategy {
*/
Map<Integer, Set<RocketMQSourceSplit>> allocate(
final Collection<RocketMQSourceSplit> mqAll, final int
parallelism);
+
+ /**
+ * Allocates RocketMQ source splits to Flink tasks based on the selected
allocation strategy.
+ *
+ * @param mqAll a collection of all available RocketMQ source splits
+ * @param parallelism the desired parallelism for the Flink tasks
+ * @param globalAssignedNumber number of allocated queues
+ * @return a map of task indices to sets of corresponding RocketMQ source
splits
+ */
+ Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ final Collection<RocketMQSourceSplit> mqAll,
+ final int parallelism,
+ int globalAssignedNumber);
}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
index 6c9d723..442afb9 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AllocateStrategyFactory.java
@@ -27,6 +27,7 @@ public class AllocateStrategyFactory {
public static final String STRATEGY_NAME_BROADCAST = "broadcast";
public static final String STRATEGY_NAME_CONSISTENT_HASH = "hash";
+ public static final String STRATEGY_NAME_AVERAGE = "average";
private AllocateStrategyFactory() {
// No public constructor.
@@ -46,6 +47,8 @@ public class AllocateStrategyFactory {
return new ConsistentHashAllocateStrategy();
case STRATEGY_NAME_BROADCAST:
return new BroadcastAllocateStrategy();
+ case STRATEGY_NAME_AVERAGE:
+ return new AverageAllocateStrategy();
default:
throw new IllegalArgumentException(
"We don't support this allocate strategy: " +
allocateStrategyName);
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java
new file mode 100644
index 0000000..f808133
--- /dev/null
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategy.java
@@ -0,0 +1,43 @@
+package org.apache.flink.connector.rocketmq.source.enumerator.allocate;
+
+import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class AverageAllocateStrategy implements AllocateStrategy {
+ @Override
+ public String getStrategyName() {
+ return AllocateStrategyFactory.STRATEGY_NAME_AVERAGE;
+ }
+
+ @Override
+ public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ Collection<RocketMQSourceSplit> mqAll, int parallelism) {
+ return null;
+ }
+
+ @Override
+ public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ Collection<RocketMQSourceSplit> mqAll, int parallelism, int
globalAssignedNumber) {
+ Map<Integer, Set<RocketMQSourceSplit>> result = new HashMap<>();
+ for (RocketMQSourceSplit mq : mqAll) {
+ int readerIndex =
+ this.getSplitOwner(mq.getTopic(), globalAssignedNumber++,
parallelism);
+ result.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(mq);
+ }
+ return result;
+ }
+
+ private int getSplitOwner(String topic, int partition, int numReaders) {
+ int startIndex = ((topic.hashCode() * 31) & 0x7FFFFFFF) % numReaders;
+
+ // here, the assumption is that the id of RocketMQ partitions are
always ascending
+ // starting from 0, and therefore can be used directly as the offset
clockwise from the
+ // start index
+ return (startIndex + partition) % numReaders;
+ }
+}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
index 2e46419..f37cf04 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/BroadcastAllocateStrategy.java
@@ -40,4 +40,10 @@ public class BroadcastAllocateStrategy implements
AllocateStrategy {
}
return result;
}
+
+ @Override
+ public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ Collection<RocketMQSourceSplit> mqAll, int parallelism, int
globalAssignedNumber) {
+ return allocate(mqAll, parallelism);
+ }
}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
index 6a3ad2c..7cd10c4 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/ConsistentHashAllocateStrategy.java
@@ -48,4 +48,10 @@ public class ConsistentHashAllocateStrategy implements
AllocateStrategy {
}
return result;
}
+
+ @Override
+ public Map<Integer, Set<RocketMQSourceSplit>> allocate(
+ Collection<RocketMQSourceSplit> mqAll, int parallelism, int
globalAssignedNumber) {
+ return allocate(mqAll, parallelism);
+ }
}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
index 09c1049..57c8a63 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/metrics/RocketMQSourceReaderMetrics.java
@@ -45,4 +45,6 @@ public class RocketMQSourceReaderMetrics {
public RocketMQSourceReaderMetrics(SourceReaderMetricGroup
sourceReaderMetricGroup) {}
public void registerNewMessageQueue(MessageQueue messageQueue) {}
+
+ public void unregisterMessageQueue(MessageQueue messageQueue) {}
}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
index a635b17..242e19e 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceFetcherManager.java
@@ -95,4 +95,8 @@ public class RocketMQSourceFetcherManager
public void wakeUp() {}
});
}
+
+ public RocketMQSplitReader getSplitReader() {
+ return (RocketMQSplitReader) fetchers.get(0).getSplitReader();
+ }
}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
index 9303f35..042c12a 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSourceReader.java
@@ -19,30 +19,43 @@
package org.apache.flink.connector.rocketmq.source.reader;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import
org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.rocketmq.common.event.SourceCheckEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceDetectEvent;
+import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent;
+import
org.apache.flink.connector.rocketmq.common.event.SourceReportOffsetEvent;
import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
import
org.apache.flink.connector.rocketmq.source.metrics.RocketMQSourceReaderMetrics;
import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import
org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplitState;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
+import com.google.common.collect.Sets;
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
/** The source reader for RocketMQ partitions. */
public class RocketMQSourceReader<T>
@@ -57,6 +70,7 @@ public class RocketMQSourceReader<T>
private final SortedMap<Long, Map<MessageQueue, Long>> offsetsToCommit;
private final ConcurrentMap<MessageQueue, Long> offsetsOfFinishedSplits;
private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics;
+ private final RocketMQSplitReader reader;
public RocketMQSourceReader(
FutureCompletingBlockingQueue<RecordsWithSplitIds<MessageView>>
elementsQueue,
@@ -64,7 +78,8 @@ public class RocketMQSourceReader<T>
RecordEmitter<MessageView, T, RocketMQSourceSplitState>
recordEmitter,
Configuration config,
SourceReaderContext context,
- RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics) {
+ RocketMQSourceReaderMetrics rocketMQSourceReaderMetrics,
+ Supplier<SplitReader<MessageView, RocketMQSourceSplit>>
readerSupplier) {
super(elementsQueue, rocketmqSourceFetcherManager, recordEmitter,
config, context);
this.offsetsToCommit = Collections.synchronizedSortedMap(new
TreeMap<>());
@@ -72,6 +87,7 @@ public class RocketMQSourceReader<T>
this.commitOffsetsOnCheckpoint =
config.get(RocketMQSourceOptions.COMMIT_OFFSETS_ON_CHECKPOINT);
this.rocketmqSourceReaderMetrics = rocketMQSourceReaderMetrics;
+ this.reader = (RocketMQSplitReader) readerSupplier.get();
}
@Override
@@ -136,10 +152,91 @@ public class RocketMQSourceReader<T>
@Override
protected RocketMQSourceSplit toSplitType(String splitId,
RocketMQSourceSplitState splitState) {
+ // Report checkpoint progress.
+ SourceReportOffsetEvent sourceEvent = new SourceReportOffsetEvent();
+ sourceEvent.setBroker(splitState.getBrokerName());
+ sourceEvent.setTopic(splitState.getTopic());
+ sourceEvent.setQueueId(splitState.getQueueId());
+ sourceEvent.setCheckpoint(splitState.getCurrentOffset());
+ context.sendSourceEventToCoordinator(sourceEvent);
+ LOG.info("Report checkpoint progress: {}", sourceEvent);
return splitState.getSourceSplit();
}
+ @Override
+ public void handleSourceEvents(SourceEvent sourceEvent) {
+ if (sourceEvent instanceof SourceDetectEvent) {
+ handleSourceDetectEvent();
+ } else if (sourceEvent instanceof SourceCheckEvent) {
+ handleSourceCheckEvent((SourceCheckEvent) sourceEvent);
+ }
+ }
+
// ------------------------
+ private void handleSourceDetectEvent() {
+ SourceInitAssignEvent sourceEvent1 = new SourceInitAssignEvent();
+ List<RocketMQSourceSplit> splits = new LinkedList<>();
+ ConcurrentMap<MessageQueue, Tuple2<Long, Long>> currentOffsetTable =
+ reader.getCurrentOffsetTable();
+
+ if (!currentOffsetTable.isEmpty()) {
+ for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry :
+ currentOffsetTable.entrySet()) {
+ MessageQueue messageQueue = entry.getKey();
+ Long startOffset = entry.getValue().f0;
+ Long stopOffset = entry.getValue().f1;
+ RocketMQSourceSplit split =
+ new RocketMQSourceSplit(messageQueue, startOffset,
stopOffset);
+ splits.add(split);
+ }
+ }
+ sourceEvent1.setSplits(splits);
+ context.sendSourceEventToCoordinator(sourceEvent1);
+ reader.setInitFinish(true);
+ }
+
+ private void handleSourceCheckEvent(SourceCheckEvent sourceEvent) {
+ Map<MessageQueue, Tuple2<Long, Long>> checkMap =
sourceEvent.getAssignedMq();
+ Set<MessageQueue> assignedMq = checkMap.keySet();
+ Set<MessageQueue> currentMq = reader.getCurrentOffsetTable().keySet();
+ Set<MessageQueue> increaseSet = Sets.difference(assignedMq, currentMq);
+ Set<MessageQueue> decreaseSet = Sets.difference(currentMq, assignedMq);
+
+ if (increaseSet.isEmpty() && decreaseSet.isEmpty()) {
+ LOG.info("No need to checkpoint, current assigned mq is same as
before.");
+ }
+
+ if (!increaseSet.isEmpty()) {
+ SplitsAddition<RocketMQSourceSplit> increase;
+ increase =
+ new SplitsAddition<>(
+ increaseSet.stream()
+ .map(
+ mq ->
+ new RocketMQSourceSplit(
+ mq,
+
checkMap.get(mq).f0,
+
checkMap.get(mq).f1,
+ true))
+ .collect(Collectors.toList()));
+ reader.handleSplitsChanges(increase);
+ }
+ if (!decreaseSet.isEmpty()) {
+ SplitsAddition<RocketMQSourceSplit> decrease;
+ decrease =
+ new SplitsAddition<>(
+ decreaseSet.stream()
+ .map(
+ mq ->
+ new RocketMQSourceSplit(
+ mq,
+
checkMap.get(mq).f0,
+
checkMap.get(mq).f1,
+ false))
+ .collect(Collectors.toList()));
+ reader.handleSplitsChanges(decrease);
+ }
+ }
@VisibleForTesting
SortedMap<Long, Map<MessageQueue, Long>> getOffsetsToCommit() {
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
index 3a07966..9a0833c 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/reader/RocketMQSplitReader.java
@@ -27,6 +27,8 @@ import
org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import
org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.rocketmq.common.config.RocketMQOptions;
+import org.apache.flink.connector.rocketmq.common.event.SourceInitAssignEvent;
+import org.apache.flink.connector.rocketmq.common.lock.SpinLock;
import org.apache.flink.connector.rocketmq.source.InnerConsumer;
import org.apache.flink.connector.rocketmq.source.InnerConsumerImpl;
import org.apache.flink.connector.rocketmq.source.RocketMQSourceOptions;
@@ -36,11 +38,13 @@ import
org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
import org.apache.flink.connector.rocketmq.source.util.UtilAll;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
+
import org.apache.rocketmq.common.message.MessageQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
@@ -81,6 +85,11 @@ public class RocketMQSplitReader<T> implements
SplitReader<MessageView, RocketMQ
private final ConcurrentMap<MessageQueue, Tuple2<Long, Long>>
currentOffsetTable;
private final RocketMQSourceReaderMetrics rocketmqSourceReaderMetrics;
+ // Init status : true-finish init; false-not finish init
+ private boolean initFinish;
+ private final RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds;
+ private final SpinLock lock;
+
public RocketMQSplitReader(
Configuration configuration,
SourceReaderContext sourceReaderContext,
@@ -92,6 +101,9 @@ public class RocketMQSplitReader<T> implements
SplitReader<MessageView, RocketMQ
this.deserializationSchema = deserializationSchema;
this.offsetsToCommit = new TreeMap<>();
this.currentOffsetTable = new ConcurrentHashMap<>();
+ this.recordsWithSplitIds = new
RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics);
+ this.initFinish = false;
+ this.lock = new SpinLock();
this.consumer = new InnerConsumerImpl(configuration);
this.consumer.start();
@@ -103,9 +115,17 @@ public class RocketMQSplitReader<T> implements
SplitReader<MessageView, RocketMQ
@Override
public RecordsWithSplitIds<MessageView> fetch() throws IOException {
+ lock.lock();
wakeup = false;
RocketMQRecordsWithSplitIds<MessageView> recordsWithSplitIds =
new RocketMQRecordsWithSplitIds<>(rocketmqSourceReaderMetrics);
+ try {
+ this.recordsWithSplitIds.finishedSplits.forEach(
+ splitId -> recordsWithSplitIds.addFinishedSplit(splitId));
+ this.recordsWithSplitIds.finishedSplits.clear();
+ } finally {
+ lock.unlock();
+ }
try {
Duration duration =
Duration.ofMillis(this.configuration.getLong(RocketMQOptions.POLL_TIMEOUT));
@@ -129,6 +149,7 @@ public class RocketMQSplitReader<T> implements
SplitReader<MessageView, RocketMQ
} catch (Exception e) {
LOG.error("Reader fetch split error", e);
}
+
return recordsWithSplitIds;
}
@@ -142,38 +163,53 @@ public class RocketMQSplitReader<T> implements
SplitReader<MessageView, RocketMQ
splitsChange.getClass()));
}
+ if (!initFinish) {
+ LOG.info("Start to init reader");
+ SourceInitAssignEvent sourceEvent = new SourceInitAssignEvent();
+ sourceEvent.setSplits(splitsChange.splits());
+ sourceReaderContext.sendSourceEventToCoordinator(sourceEvent);
+ initFinish = true;
+ }
+ lock.lock();
+
+ LOG.info("Receive split change: " + splitsChange.splits().toString());
// Assignment.
ConcurrentMap<MessageQueue, Tuple2<Long, Long>> newOffsetTable = new
ConcurrentHashMap<>();
- // Set up the stopping timestamps.
- splitsChange
- .splits()
- .forEach(
- split -> {
- MessageQueue messageQueue =
- new MessageQueue(
- split.getTopic(),
- split.getBrokerName(),
- split.getQueueId());
- newOffsetTable.put(
- messageQueue,
- new Tuple2<>(
- split.getStartingOffset(),
split.getStoppingOffset()));
-
rocketmqSourceReaderMetrics.registerNewMessageQueue(messageQueue);
- });
-
- // todo: log message queue change
-
+ try {
+ // Set up the stopping timestamps.
+ splitsChange
+ .splits()
+ .forEach(
+ split -> {
+ if (!split.getIsIncrease()) {
+ finishSplitAtRecord(
+ split.getMessageQueue(),
+ split.getStoppingOffset(),
+ recordsWithSplitIds);
+ } else {
+ if
(!currentOffsetTable.containsKey(split.getMessageQueue())) {
+ registerSplits(split);
+ newOffsetTable.put(
+ split.getMessageQueue(),
+ new Tuple2<>(
+
split.getStartingOffset(),
+
split.getStoppingOffset()));
+ }
+ }
+ });
+ } finally {
+ lock.unlock();
+ }
// It will replace the previous assignment
- Set<MessageQueue> incrementalSplits = newOffsetTable.keySet();
- consumer.assign(incrementalSplits);
+ consumer.assign(currentOffsetTable.keySet());
// set offset to consumer
for (Map.Entry<MessageQueue, Tuple2<Long, Long>> entry :
newOffsetTable.entrySet()) {
MessageQueue messageQueue = entry.getKey();
Long startingOffset = entry.getValue().f0;
try {
- consumer.seek(messageQueue, startingOffset);
+ consumer.seek(messageQueue, startingOffset == -1L ? 0L :
startingOffset);
} catch (Exception e) {
String info =
String.format(
@@ -207,14 +243,31 @@ public class RocketMQSplitReader<T> implements
SplitReader<MessageView, RocketMQ
}
}
+ public ConcurrentMap<MessageQueue, Tuple2<Long, Long>>
getCurrentOffsetTable() {
+ return currentOffsetTable;
+ }
+
+ private void registerSplits(RocketMQSourceSplit split) {
+ LOG.info("Register split {}", split.splitId());
+ this.currentOffsetTable.put(
+ split.getMessageQueue(),
+ new Tuple2<>(split.getStartingOffset(),
split.getStoppingOffset()));
+
this.rocketmqSourceReaderMetrics.registerNewMessageQueue(split.getMessageQueue());
+ }
+
+ public void setInitFinish(boolean initFinish) {
+ this.initFinish = initFinish;
+ }
+
private void finishSplitAtRecord(
MessageQueue messageQueue,
long currentOffset,
RocketMQRecordsWithSplitIds<MessageView> recordsBySplits) {
-
LOG.info("message queue {} has reached stopping offset {}",
messageQueue, currentOffset);
- // recordsBySplits.addFinishedSplit(getSplitId(messageQueue));
+
this.currentOffsetTable.remove(messageQueue);
+ this.rocketmqSourceReaderMetrics.unregisterMessageQueue(messageQueue);
+
recordsBySplits.addFinishedSplit(RocketMQSourceSplit.toSplitId(messageQueue));
}
// ---------------- private helper class ------------------------
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
index 36c7a0f..eb37e19 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializer.java
@@ -47,6 +47,7 @@ public class RocketMQPartitionSplitSerializer
out.writeInt(split.getQueueId());
out.writeLong(split.getStartingOffset());
out.writeLong(split.getStoppingOffset());
+ out.writeBoolean(split.getIsIncrease());
out.flush();
return byteArrayOutputStream.toByteArray();
}
@@ -61,8 +62,13 @@ public class RocketMQPartitionSplitSerializer
int partition = in.readInt();
long startingOffset = in.readLong();
long stoppingOffset = in.readLong();
+ if (version == SNAPSHOT_VERSION) {
+ return new RocketMQSourceSplit(
+ topic, broker, partition, startingOffset,
stoppingOffset);
+ }
+ boolean isIncrease = in.readBoolean();
return new RocketMQSourceSplit(
- topic, broker, partition, startingOffset, stoppingOffset);
+ topic, broker, partition, startingOffset, stoppingOffset,
isIncrease);
}
}
}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
index 7124086..a94044d 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplit.java
@@ -37,6 +37,8 @@ public class RocketMQSourceSplit implements SourceSplit {
private final int queueId;
private final long startingOffset;
private final long stoppingOffset;
+ // whether the split is increase or decrease: true-increase, false-decrease
+ private final boolean isIncrease;
public RocketMQSourceSplit(
MessageQueue messageQueue, long startingOffset, long
stoppingOffset) {
@@ -48,6 +50,20 @@ public class RocketMQSourceSplit implements SourceSplit {
stoppingOffset);
}
+ public RocketMQSourceSplit(
+ MessageQueue messageQueue,
+ long startingOffset,
+ long stoppingOffset,
+ boolean isIncrease) {
+ this(
+ messageQueue.getTopic(),
+ messageQueue.getBrokerName(),
+ messageQueue.getQueueId(),
+ startingOffset,
+ stoppingOffset,
+ isIncrease);
+ }
+
public RocketMQSourceSplit(
String topic,
String brokerName,
@@ -59,6 +75,22 @@ public class RocketMQSourceSplit implements SourceSplit {
this.queueId = queueId;
this.startingOffset = startingOffset;
this.stoppingOffset = stoppingOffset;
+ this.isIncrease = true;
+ }
+
+ public RocketMQSourceSplit(
+ String topic,
+ String brokerName,
+ int queueId,
+ long startingOffset,
+ long stoppingOffset,
+ boolean isIncrease) {
+ this.topic = topic;
+ this.brokerName = brokerName;
+ this.queueId = queueId;
+ this.startingOffset = startingOffset;
+ this.stoppingOffset = stoppingOffset;
+ this.isIncrease = isIncrease;
}
public String getTopic() {
@@ -85,6 +117,28 @@ public class RocketMQSourceSplit implements SourceSplit {
return new MessageQueue(topic, brokerName, queueId);
}
+ public boolean getIsIncrease() {
+ return isIncrease;
+ }
+
+ public static String toSplitId(MessageQueue messageQueue) {
+ return messageQueue.getTopic()
+ + SEPARATOR
+ + messageQueue.getBrokerName()
+ + SEPARATOR
+ + messageQueue.getQueueId();
+ }
+
+ public static RocketMQSourceSplit clone(RocketMQSourceSplit split) {
+ return new RocketMQSourceSplit(
+ split.topic,
+ split.brokerName,
+ split.queueId,
+ split.startingOffset,
+ split.stoppingOffset,
+ split.isIncrease);
+ }
+
@Override
public String splitId() {
return topic + SEPARATOR + brokerName + SEPARATOR + queueId;
@@ -93,13 +147,13 @@ public class RocketMQSourceSplit implements SourceSplit {
@Override
public String toString() {
return String.format(
- "(Topic: %s, BrokerName: %s, QueueId: %d, MinOffset: %d,
MaxOffset: %d)",
- topic, brokerName, queueId, startingOffset, stoppingOffset);
+ "(Topic: %s, BrokerName: %s, QueueId: %d, MinOffset: %d,
MaxOffset: %d, status: %s)",
+ topic, brokerName, queueId, startingOffset, stoppingOffset,
isIncrease);
}
@Override
public int hashCode() {
- return Objects.hash(topic, brokerName, queueId, startingOffset,
stoppingOffset);
+ return Objects.hash(topic, brokerName, queueId, startingOffset,
stoppingOffset, isIncrease);
}
@Override
@@ -112,6 +166,7 @@ public class RocketMQSourceSplit implements SourceSplit {
&& brokerName.equals(other.brokerName)
&& queueId == other.queueId
&& startingOffset == other.startingOffset
- && stoppingOffset == other.stoppingOffset;
+ && stoppingOffset == other.stoppingOffset
+ && isIncrease == other.isIncrease;
}
}
diff --git
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
index ca74d58..5f16a80 100644
---
a/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
+++
b/src/main/java/org/apache/flink/connector/rocketmq/source/split/RocketMQSourceSplitState.java
@@ -48,6 +48,11 @@ public class RocketMQSourceSplitState extends
RocketMQSourceSplit {
*/
public RocketMQSourceSplit getSourceSplit() {
return new RocketMQSourceSplit(
- getTopic(), getBrokerName(), getQueueId(), getCurrentOffset(),
getStoppingOffset());
+ getTopic(),
+ getBrokerName(),
+ getQueueId(),
+ getCurrentOffset(),
+ getStoppingOffset(),
+ getIsIncrease());
}
}
diff --git
a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
index 08371b3..78ffd17 100644
---
a/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
+++
b/src/test/java/org/apache/flink/connector/rocketmq/legacy/sourceFunction/RocketMQSourceFunctionTest.java
@@ -26,7 +26,6 @@ import
org.apache.flink.connector.rocketmq.legacy.common.serialization.SimpleStr
import org.apache.flink.connector.rocketmq.legacy.common.util.TestUtils;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
-import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.common.message.MessageQueue;
import org.junit.Assert;
import org.junit.Test;
diff --git
a/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java
b/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java
new file mode 100644
index 0000000..f63c8d1
--- /dev/null
+++
b/src/test/java/org/apache/flink/connector/rocketmq/source/enumerator/allocate/AverageAllocateStrategyTest.java
@@ -0,0 +1,97 @@
+package org.apache.flink.connector.rocketmq.source.enumerator.allocate;
+
+import org.apache.flink.connector.rocketmq.source.split.RocketMQSourceSplit;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageAllocateStrategyTest {
+
+ private static final String BROKER_NAME = "brokerName";
+ private static final String PREFIX_TOPIC = "test-topic-";
+ private static final int NUM_SPLITS = 900;
+ private static final int[] SPLIT_SIZE = {1000, 2000, 3000, 4000, 5000,
6000, 7000, 8000, 9000};
+
+ @Test
+ public void averageAllocateStrategyTest() {
+ AllocateStrategy allocateStrategy = new AverageAllocateStrategy();
+ Collection<RocketMQSourceSplit> mqAll = new ArrayList<>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ mqAll.add(
+ new RocketMQSourceSplit(
+ PREFIX_TOPIC + (i + 1),
+ BROKER_NAME,
+ i,
+ 0,
+ SPLIT_SIZE[i % SPLIT_SIZE.length]));
+ }
+ int parallelism = 3;
+ Map<Integer, Set<RocketMQSourceSplit>> result =
+ allocateStrategy.allocate(mqAll, parallelism, 0);
+ assertEquals(NUM_SPLITS / parallelism, result.get(0).size());
+ assertEquals(NUM_SPLITS / parallelism, result.get(1).size());
+ assertEquals(NUM_SPLITS / parallelism, result.get(2).size());
+ }
+
+ @Test
+ public void averagesAllocateStrategyTest() {
+ AllocateStrategy allocateStrategy = new AverageAllocateStrategy();
+ Collection<RocketMQSourceSplit> mqAll = new ArrayList<>();
+ for (int i = 0; i < NUM_SPLITS; i++) {
+ mqAll.add(
+ new RocketMQSourceSplit(
+ PREFIX_TOPIC + (i + 1),
+ BROKER_NAME,
+ i,
+ 0,
+ SPLIT_SIZE[i % SPLIT_SIZE.length]));
+ }
+ int parallelism = 3;
+ Map<Integer, Set<RocketMQSourceSplit>> result =
+ allocateStrategy.allocate(mqAll, parallelism, 0);
+ assertEquals(NUM_SPLITS / parallelism, result.get(0).size());
+ assertEquals(NUM_SPLITS / parallelism, result.get(1).size());
+ assertEquals(NUM_SPLITS / parallelism, result.get(2).size());
+
+ mqAll.clear();
+ for (int i = NUM_SPLITS; i < 8 + NUM_SPLITS; i++) {
+ mqAll.add(
+ new RocketMQSourceSplit(
+ PREFIX_TOPIC + (i + 1),
+ BROKER_NAME,
+ i,
+ 0,
+ SPLIT_SIZE[i % SPLIT_SIZE.length]));
+ }
+ Map<Integer, Set<RocketMQSourceSplit>> result1 =
+ allocateStrategy.allocate(mqAll, parallelism, NUM_SPLITS);
+
+ mqAll.clear();
+ for (int i = 8 + NUM_SPLITS; i < 8 + 7 + NUM_SPLITS; i++) {
+ mqAll.add(
+ new RocketMQSourceSplit(
+ PREFIX_TOPIC + (i + 1),
+ BROKER_NAME,
+ i,
+ 0,
+ SPLIT_SIZE[i % SPLIT_SIZE.length]));
+ }
+ Map<Integer, Set<RocketMQSourceSplit>> result2 =
+ allocateStrategy.allocate(mqAll, parallelism, NUM_SPLITS + 8);
+
+ result1.forEach((k, v) -> result.computeIfAbsent(k, r -> new
HashSet<>()).addAll(v));
+ result2.forEach((k, v) -> result.computeIfAbsent(k, r -> new
HashSet<>()).addAll(v));
+
+ // No matter how many times it's assigned, it's always equal
+ assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(0).size());
+ assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(1).size());
+ assertEquals((NUM_SPLITS + 8 + 7) / parallelism, result.get(2).size());
+ }
+}
diff --git
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
b/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
index f1fa5af..432994e 100644
---
a/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
+++
b/src/test/java/org/apache/flink/connector/rocketmq/source/split/RocketMQPartitionSplitSerializerTest.java
@@ -36,4 +36,56 @@ public class RocketMQPartitionSplitSerializerTest {
serializer.deserialize(serializer.getVersion(),
serializer.serialize(expected));
assertEquals(expected, actual);
}
+
+ @Test
+ public void testSerializeAndDeserialize() throws IOException {
+ RocketMQPartitionSplitSerializer serializer = new
RocketMQPartitionSplitSerializer();
+ RocketMQSourceSplit originalSplit =
+ new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L,
200L, false);
+
+ byte[] serialized = serializer.serialize(originalSplit);
+ RocketMQSourceSplit deserializedSplit =
+ serializer.deserialize(serializer.getVersion(), serialized);
+
+ assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic());
+ assertEquals(originalSplit.getBrokerName(),
deserializedSplit.getBrokerName());
+ assertEquals(originalSplit.getQueueId(),
deserializedSplit.getQueueId());
+ assertEquals(originalSplit.getStartingOffset(),
deserializedSplit.getStartingOffset());
+ assertEquals(originalSplit.getStoppingOffset(),
deserializedSplit.getStoppingOffset());
+ assertEquals(originalSplit.getIsIncrease(),
deserializedSplit.getIsIncrease());
+ }
+
+ @Test
+ public void testDeserializeWithOldVersion() throws IOException {
+ RocketMQPartitionSplitSerializer serializer = new
RocketMQPartitionSplitSerializer();
+ RocketMQSourceSplit originalSplit =
+ new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L,
200L, false);
+
+ byte[] serialized = serializer.serialize(originalSplit);
+ RocketMQSourceSplit deserializedSplit = serializer.deserialize(1,
serialized);
+
+ assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic());
+ assertEquals(originalSplit.getBrokerName(),
deserializedSplit.getBrokerName());
+ assertEquals(originalSplit.getQueueId(),
deserializedSplit.getQueueId());
+ assertEquals(originalSplit.getStartingOffset(),
deserializedSplit.getStartingOffset());
+ assertEquals(originalSplit.getStoppingOffset(),
deserializedSplit.getStoppingOffset());
+ assertEquals(originalSplit.getIsIncrease(),
deserializedSplit.getIsIncrease());
+ }
+
+ @Test
+ public void testDeserializeWithOldVersion1() throws IOException {
+ RocketMQPartitionSplitSerializer serializer = new
RocketMQPartitionSplitSerializer();
+ RocketMQSourceSplit originalSplit =
+ new RocketMQSourceSplit("testTopic", "testBroker", 0, 100L,
200L, false);
+
+ byte[] serialized = serializer.serialize(originalSplit);
+ RocketMQSourceSplit deserializedSplit = serializer.deserialize(0,
serialized);
+
+ assertEquals(originalSplit.getTopic(), deserializedSplit.getTopic());
+ assertEquals(originalSplit.getBrokerName(),
deserializedSplit.getBrokerName());
+ assertEquals(originalSplit.getQueueId(),
deserializedSplit.getQueueId());
+ assertEquals(originalSplit.getStartingOffset(),
deserializedSplit.getStartingOffset());
+ assertEquals(originalSplit.getStoppingOffset(),
deserializedSplit.getStoppingOffset());
+ assertEquals(true, deserializedSplit.getIsIncrease());
+ }
}