This is an automated email from the ASF dual-hosted git repository. duhengforever pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/rocketmq-flink.git
commit aae7134f56c38963711488331595ac1888d28558 Author: SteNicholas <[email protected]> AuthorDate: Tue Aug 24 15:48:20 2021 +0800 RocketMQSource improves the message consume of RocketMQPartitionSplitReader (#791) --- .../flink/source/reader/RocketMQPartitionSplitReader.java | 10 +++++++--- .../rocketmq/flink/source/split/RocketMQPartitionSplit.java | 7 ++++--- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java index 41fbbea..1846114 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java +++ b/src/main/java/org/apache/rocketmq/flink/source/reader/RocketMQPartitionSplitReader.java @@ -133,7 +133,7 @@ public class RocketMQPartitionSplitReader<T> ? consumer.searchOffset(messageQueue, startTime) : startOffset; } catch (MQClientException e) { - LOG.error( + LOG.warn( String.format( "Search RocketMQ message offset of topic[%s] broker[%s] queue[%d] exception.", messageQueue.getTopic(), @@ -159,13 +159,13 @@ public class RocketMQPartitionSplitReader<T> return recordsBySplits; } pullResult = - consumer.pullBlockIfNotFound( + consumer.pull( messageQueue, tag, messageOffset, MAX_MESSAGE_NUMBER_PER_BLOCK); } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) { - LOG.error( + LOG.warn( String.format( "Pull RocketMQ messages of topic[%s] broker[%s] queue[%d] tag[%s] from offset[%d] exception.", messageQueue.getTopic(), @@ -222,6 +222,10 @@ public class RocketMQPartitionSplitReader<T> } } recordsBySplits.prepareForRead(); + LOG.debug( + String.format( + "Fetch record splits for MetaQ subscribe message queues of topic[%s].", + topic)); return recordsBySplits; } diff --git a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java index 9bda60f..5717767 100644 --- a/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java +++ b/src/main/java/org/apache/rocketmq/flink/source/split/RocketMQPartitionSplit.java @@ -73,13 +73,13 @@ public class RocketMQPartitionSplit implements SourceSplit { @Override public String toString() { return String.format( - "[Topic: %s, Partition: %s, StartingOffset: %d, StoppingTimestamp: %d]", - topic, partition, startingOffset, stoppingTimestamp); + "[Topic: %s, Broker: %s, Partition: %s, StartingOffset: %d, StoppingTimestamp: %d]", + topic, broker, partition, startingOffset, stoppingTimestamp); } @Override public int hashCode() { - return Objects.hash(topic, partition, startingOffset, stoppingTimestamp); + return Objects.hash(topic, broker, partition, startingOffset, stoppingTimestamp); } @Override @@ -89,6 +89,7 @@ public class RocketMQPartitionSplit implements SourceSplit { } RocketMQPartitionSplit other = (RocketMQPartitionSplit) obj; return topic.equals(other.topic) + && broker.equals(other.broker) && partition == other.partition && startingOffset == other.startingOffset && stoppingTimestamp == other.stoppingTimestamp;
