Carl-Zhou-CN commented on code in PR #9777:
URL: https://github.com/apache/seatunnel/pull/9777#discussion_r2318993227
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java:
##########
@@ -150,41 +177,40 @@ public List<CatalogTable> getProducedCatalogTables() {
}
@Override
- public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(jobContext.getJobMode())
- ? Boundedness.BOUNDED
- : Boundedness.UNBOUNDED;
- }
-
- @Override
- public void setJobContext(JobContext jobContext) {
- this.jobContext = jobContext;
- }
-
- @Override
- public SourceReader<SeaTunnelRow, RocketMqSourceSplit> createReader(
+ public SourceReader<SeaTunnelRow, RocketMQPartitionSplit> createReader(
SourceReader.Context readerContext) throws Exception {
- return new RocketMqSourceReader(this.metadata, deserializationSchema,
readerContext);
+ BlockingQueue<RecordsWithSplitIds<MessageExt>> elementsQueue =
+ new
LinkedBlockingQueue<>(this.metadata.getBaseConfig().getBatchSize());
+ Supplier<RocketMQPartitionSplitReader> splitReader =
+ () -> new RocketMQPartitionSplitReader(this.metadata,
readerContext);
+
+ RocketMQSourceFetcherManager kafkaSourceFetcherManager =
Review Comment:
```suggestion
RocketMQSourceFetcherManager rocketMQSourceFetcherManager =
```
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqRecordEmitter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RocketMqRecordEmitter
+ implements RecordEmitter<MessageExt, SeaTunnelRow,
RocketMQPartitionSplitState> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(RocketMqRecordEmitter.class);
+ private final OutputCollector<SeaTunnelRow> outputCollector;
+ protected final SourceReader.Context context;
+ protected final Counter maxRecordFetchDelay;
+ // partition,maxDelay
+ private final Map<Integer, Long> maxDelay;
+
+ private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+ public final String RECORD_FETCH_DELAY = "RecordFetchDelay";
+
+ public RocketMqRecordEmitter(
+ DeserializationSchema<SeaTunnelRow> deserializationSchema,
+ SourceReader.Context context) {
+ this.deserializationSchema = deserializationSchema;
+ this.context = context;
+ this.outputCollector = new OutputCollector<>();
+ this.maxRecordFetchDelay =
context.getMetricsContext().counter(RECORD_FETCH_DELAY);
+ this.maxDelay = new HashMap<>();
+ }
+
+ @Override
+ public void emitRecord(
+ MessageExt consumerRecord,
+ Collector<SeaTunnelRow> collector,
+ RocketMQPartitionSplitState splitState)
+ throws Exception {
+ outputCollector.output = collector;
+ reportMetrics(consumerRecord);
+ deserializationSchema.deserialize(consumerRecord.getBody(),
outputCollector);
+ // consumerRecord.offset + 1 is the offset commit to Kafka and also
the start offset
Review Comment:
```suggestion
// consumerRecord.offset + 1 is the offset commit to RocketMQ and
also the start offset
```
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/fetch/RocketMQSourceFetcherManager.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source.fetch;
+
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcher;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherTask;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.source.OffsetCommitCallback;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMQPartitionSplit;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.source.RocketMQPartitionSplitReader;
+
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import lombok.SneakyThrows;
+
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.function.Supplier;
+
+/**
+ * @author 02211659 bianxiang
+ * @date 2025-08-19 10:06:05
Review Comment:
delete,we have git
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMQPartitionSplitReader.java:
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsAddition;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsChange;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class RocketMQPartitionSplitReader
+ implements SplitReader<MessageExt, RocketMQPartitionSplit> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RocketMQPartitionSplitReader.class);
Review Comment:
In a module, we still try our best to ensure the uniformity of log
implementation, using annotations or this display declaration for all
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSource.java:
##########
@@ -150,41 +177,40 @@ public List<CatalogTable> getProducedCatalogTables() {
}
@Override
- public Boundedness getBoundedness() {
- return JobMode.BATCH.equals(jobContext.getJobMode())
- ? Boundedness.BOUNDED
- : Boundedness.UNBOUNDED;
- }
-
- @Override
- public void setJobContext(JobContext jobContext) {
- this.jobContext = jobContext;
- }
-
- @Override
- public SourceReader<SeaTunnelRow, RocketMqSourceSplit> createReader(
+ public SourceReader<SeaTunnelRow, RocketMQPartitionSplit> createReader(
SourceReader.Context readerContext) throws Exception {
- return new RocketMqSourceReader(this.metadata, deserializationSchema,
readerContext);
+ BlockingQueue<RecordsWithSplitIds<MessageExt>> elementsQueue =
+ new
LinkedBlockingQueue<>(this.metadata.getBaseConfig().getBatchSize());
+ Supplier<RocketMQPartitionSplitReader> splitReader =
+ () -> new RocketMQPartitionSplitReader(this.metadata,
readerContext);
+
+ RocketMQSourceFetcherManager kafkaSourceFetcherManager =
+ new RocketMQSourceFetcherManager(elementsQueue,
splitReader::get);
+ final RocketMqRecordEmitter rocketMqRecordEmitter =
+ new RocketMqRecordEmitter(deserializationSchema,
readerContext);
+
+ return new RocketMqSourceReader(
+ elementsQueue,
+ kafkaSourceFetcherManager,
Review Comment:
```suggestion
rocketMQSourceFetcherManager,
```
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMQPartitionSplitReader.java:
##########
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
+
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.common.utils.JsonUtils;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsAddition;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitsChange;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
+import
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
+import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class RocketMQPartitionSplitReader
+ implements SplitReader<MessageExt, RocketMQPartitionSplit> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RocketMQPartitionSplitReader.class);
+
+ private final long pollTimeOut = 10000L;
+
+ private final List<String> topics;
+
+ private final ConsumerMetadata metadata;
+
+ private final Set<String> emptySplits = new HashSet<>();
+
+ private final Map<MessageQueue, Long> stoppingTimestamps;
+
+ private final DefaultLitePullConsumer consumer;
+
+ // todo batch mode need
+ private final Map<MessageQueue, Long> stoppingOffsets;
+
+ private final Map<MessageQueue, Long> commitOffsets;
+
+ private volatile boolean wakeup = false;
+
+ public RocketMQPartitionSplitReader(
+ ConsumerMetadata metadata, SourceReader.Context readerContext) {
+ this.metadata = metadata;
+ RocketMqBaseConfiguration config = metadata.getBaseConfig();
+ this.topics = metadata.getTopics();
+ this.stoppingTimestamps = new HashMap<>();
+ this.stoppingOffsets = new HashMap<>();
+ this.commitOffsets = new HashMap<>();
+ this.consumer =
+ initDefaultLitePullConsumer(
+ config,
+ metadata,
+ !metadata.isEnabledCommitCheckpoint(),
+ readerContext.getIndexOfSubtask());
+ try {
+ this.consumer.start();
+ } catch (MQClientException e) {
+ // Start rocketmq failed
+ throw new RocketMqConnectorException(
+ RocketMqConnectorErrorCode.CONSUMER_START_ERROR, e);
+ }
+ }
+
+ @Override
+ public RecordsWithSplitIds<MessageExt> fetch() throws IOException {
+ List<MessageExt> messageExts;
+ RocketMQPartitionSplitRecords recordsBySplits = new
RocketMQPartitionSplitRecords();
+ if (wakeup) {
+ wakeup = false;
+ recordsBySplits.prepareForRead();
+ return recordsBySplits;
+ }
+ try {
+ messageExts = consumer.poll(pollTimeOut);
Review Comment:
Can we add a parameter to control pollTimeOut?
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java:
##########
@@ -46,67 +43,76 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
@Slf4j
public class RocketMqSourceSplitEnumerator
- implements SourceSplitEnumerator<RocketMqSourceSplit,
RocketMqSourceState> {
+ implements SourceSplitEnumerator<RocketMQPartitionSplit,
RocketMqSourceState> {
- private static final long DEFAULT_DISCOVERY_INTERVAL_MILLIS = 60 * 1000;
- private final Map<MessageQueue, RocketMqSourceSplit> assignedSplit;
private final ConsumerMetadata metadata;
- private final Context<RocketMqSourceSplit> context;
- private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
+ private final Context<RocketMQPartitionSplit> context;
private ScheduledExecutorService executor;
private ScheduledFuture scheduledFuture;
- private final Object lock = new Object();
+ private final Map<MessageQueue, RocketMQPartitionSplit> assignedSplit;
+ private final Map<MessageQueue, RocketMQPartitionSplit> pendingSplit;
// ms
private long discoveryIntervalMillis;
+ Map<MessageQueue, Long> specificStartupOffsets;
+ private volatile boolean initialized;
+ private DefaultMQPullConsumer consumer;
+ private final Object lock = new Object();
+ /** The topic used for this RocketMQSource. */
+ private final List<String> topics;
+
+ private final long consumerOffsetTimestamp;
public RocketMqSourceSplitEnumerator(
- ConsumerMetadata metadata,
SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
+ ConsumerMetadata metadata,
+ Context<RocketMQPartitionSplit> context,
+ long discoveryIntervalMillis) {
this.metadata = metadata;
+ final RocketMqBaseConfiguration config = this.metadata.getBaseConfig();
+ this.topics = this.metadata.getTopics();
+ this.consumerOffsetTimestamp =
this.metadata.getStartOffsetsTimestamp();
this.context = context;
+ this.discoveryIntervalMillis = discoveryIntervalMillis;
this.assignedSplit = new HashMap<>();
this.pendingSplit = new HashMap<>();
+ specificStartupOffsets = this.metadata.getSpecificStartOffsets();
// Set `rocketmq.client.logUseSlf4j` to `true` to avoid create many
// `AsyncAppender-Dispatcher-Thread`
System.setProperty("rocketmq.client.logUseSlf4j", "true");
+ initialRocketMQConsumer(config);
Review Comment:
Wouldn't it be better if we reused the implementation within
RocketMqAdminUtil here?
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java:
##########
@@ -223,149 +259,106 @@ private void fetchPendingPartitionSplit() {
});
}
- private Set<RocketMqSourceSplit> getTopicInfo() {
- log.info("Configured topics: {}", metadata.getTopics());
- List<Map<MessageQueue, TopicOffset>> offsetTopics =
- RocketMqAdminUtil.offsetTopics(metadata.getBaseConfig(),
metadata.getTopics());
- Set<RocketMqSourceSplit> sourceSplits = Sets.newConcurrentHashSet();
- offsetTopics.forEach(
- messageQueueOffsets -> {
- messageQueueOffsets.forEach(
- (messageQueue, topicOffset) -> {
- sourceSplits.add(
- new RocketMqSourceSplit(
- messageQueue,
- topicOffset.getMinOffset(),
- topicOffset.getMaxOffset()));
- });
- });
- return sourceSplits;
- }
-
private void setPartitionStartOffset() throws MQClientException {
- Collection<MessageQueue> topicPartitions = pendingSplit.keySet();
- Map<MessageQueue, Long> topicPartitionOffsets = null;
- switch (metadata.getStartMode()) {
- case CONSUME_FROM_FIRST_OFFSET:
- topicPartitionOffsets =
- listOffsets(topicPartitions,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- break;
- case CONSUME_FROM_LAST_OFFSET:
- topicPartitionOffsets =
- listOffsets(topicPartitions,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- break;
- case CONSUME_FROM_TIMESTAMP:
- topicPartitionOffsets =
- listOffsets(topicPartitions,
ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
- break;
- case CONSUME_FROM_GROUP_OFFSETS:
- topicPartitionOffsets =
listConsumerGroupOffsets(topicPartitions);
- if (topicPartitionOffsets.isEmpty()) {
- topicPartitionOffsets =
- listOffsets(
- topicPartitions,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- }
- break;
- case CONSUME_FROM_SPECIFIC_OFFSETS:
- topicPartitionOffsets = metadata.getSpecificStartOffsets();
- // Fill in broker name
- setMessageQueueBroker(topicPartitions, topicPartitionOffsets);
- break;
- default:
- throw new RocketMqConnectorException(
-
RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR,
- metadata.getStartMode().name());
- }
- topicPartitionOffsets
- .entrySet()
- .forEach(
- entry -> {
- if (pendingSplit.containsKey(entry.getKey())) {
-
pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue());
- }
- });
- }
-
- private void setMessageQueueBroker(
- Collection<MessageQueue> topicPartitions,
- Map<MessageQueue, Long> topicPartitionOffsets) {
- Map<String, String> flatTopicPartitions =
- topicPartitions.stream()
- .collect(
- Collectors.toMap(
- messageQueue ->
- messageQueue.getTopic()
- + "-"
- +
messageQueue.getQueueId(),
- MessageQueue::getBrokerName));
- for (MessageQueue messageQueue : topicPartitionOffsets.keySet()) {
- String key = messageQueue.getTopic() + "-" +
messageQueue.getQueueId();
- if (flatTopicPartitions.containsKey(key)) {
- messageQueue.setBrokerName(flatTopicPartitions.get(key));
+ Set<MessageQueue> pendingMessageQueues = pendingSplit.keySet();
+ Map<MessageQueue, Long> topicPartitionOffsets = new HashMap<>();
+ for (MessageQueue mq : pendingMessageQueues) {
+ long offset;
+ switch (metadata.getStartMode()) {
+ case CONSUME_FROM_LAST_OFFSET:
+ offset = consumer.maxOffset(mq);
+ break;
+ case CONSUME_FROM_FIRST_OFFSET:
+ offset = consumer.minOffset(mq);
+ break;
+ case CONSUME_FROM_GROUP_OFFSETS:
+ offset = consumer.fetchConsumeOffset(mq, false);
+ // If broker throw exception,return -2.should be
distinguished from the
+ // initialization scenario
+ if (offset <= -2) {
+ throw new RuntimeException(
+ "An error occurred while fetching
offset,please check up server's log");
Review Comment:
Will continue to maintain RocketMqConnectorException better?
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqRecordEmitter.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.schema.event.SchemaChangeEvent;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
+
+import org.apache.rocketmq.common.message.MessageExt;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RocketMqRecordEmitter
+ implements RecordEmitter<MessageExt, SeaTunnelRow,
RocketMQPartitionSplitState> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(RocketMqRecordEmitter.class);
Review Comment:
as above
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java:
##########
@@ -223,149 +259,106 @@ private void fetchPendingPartitionSplit() {
});
}
- private Set<RocketMqSourceSplit> getTopicInfo() {
- log.info("Configured topics: {}", metadata.getTopics());
- List<Map<MessageQueue, TopicOffset>> offsetTopics =
- RocketMqAdminUtil.offsetTopics(metadata.getBaseConfig(),
metadata.getTopics());
- Set<RocketMqSourceSplit> sourceSplits = Sets.newConcurrentHashSet();
- offsetTopics.forEach(
- messageQueueOffsets -> {
- messageQueueOffsets.forEach(
- (messageQueue, topicOffset) -> {
- sourceSplits.add(
- new RocketMqSourceSplit(
- messageQueue,
- topicOffset.getMinOffset(),
- topicOffset.getMaxOffset()));
- });
- });
- return sourceSplits;
- }
-
private void setPartitionStartOffset() throws MQClientException {
- Collection<MessageQueue> topicPartitions = pendingSplit.keySet();
- Map<MessageQueue, Long> topicPartitionOffsets = null;
- switch (metadata.getStartMode()) {
- case CONSUME_FROM_FIRST_OFFSET:
- topicPartitionOffsets =
- listOffsets(topicPartitions,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- break;
- case CONSUME_FROM_LAST_OFFSET:
- topicPartitionOffsets =
- listOffsets(topicPartitions,
ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- break;
- case CONSUME_FROM_TIMESTAMP:
- topicPartitionOffsets =
- listOffsets(topicPartitions,
ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
- break;
- case CONSUME_FROM_GROUP_OFFSETS:
- topicPartitionOffsets =
listConsumerGroupOffsets(topicPartitions);
- if (topicPartitionOffsets.isEmpty()) {
- topicPartitionOffsets =
- listOffsets(
- topicPartitions,
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
- }
- break;
- case CONSUME_FROM_SPECIFIC_OFFSETS:
- topicPartitionOffsets = metadata.getSpecificStartOffsets();
- // Fill in broker name
- setMessageQueueBroker(topicPartitions, topicPartitionOffsets);
- break;
- default:
- throw new RocketMqConnectorException(
-
RocketMqConnectorErrorCode.UNSUPPORTED_START_MODE_ERROR,
- metadata.getStartMode().name());
- }
- topicPartitionOffsets
- .entrySet()
- .forEach(
- entry -> {
- if (pendingSplit.containsKey(entry.getKey())) {
-
pendingSplit.get(entry.getKey()).setStartOffset(entry.getValue());
- }
- });
- }
-
- private void setMessageQueueBroker(
- Collection<MessageQueue> topicPartitions,
- Map<MessageQueue, Long> topicPartitionOffsets) {
- Map<String, String> flatTopicPartitions =
- topicPartitions.stream()
- .collect(
- Collectors.toMap(
- messageQueue ->
- messageQueue.getTopic()
- + "-"
- +
messageQueue.getQueueId(),
- MessageQueue::getBrokerName));
- for (MessageQueue messageQueue : topicPartitionOffsets.keySet()) {
- String key = messageQueue.getTopic() + "-" +
messageQueue.getQueueId();
- if (flatTopicPartitions.containsKey(key)) {
- messageQueue.setBrokerName(flatTopicPartitions.get(key));
+ Set<MessageQueue> pendingMessageQueues = pendingSplit.keySet();
+ Map<MessageQueue, Long> topicPartitionOffsets = new HashMap<>();
+ for (MessageQueue mq : pendingMessageQueues) {
+ long offset;
+ switch (metadata.getStartMode()) {
+ case CONSUME_FROM_LAST_OFFSET:
+ offset = consumer.maxOffset(mq);
+ break;
+ case CONSUME_FROM_FIRST_OFFSET:
+ offset = consumer.minOffset(mq);
+ break;
+ case CONSUME_FROM_GROUP_OFFSETS:
+ offset = consumer.fetchConsumeOffset(mq, false);
+ // If broker throw exception,return -2.should be
distinguished from the
+ // initialization scenario
+ if (offset <= -2) {
+ throw new RuntimeException(
+ "An error occurred while fetching
offset,please check up server's log");
+ }
+ // the min offset return if consumer group first
join,return a negative number
+ // if catch exception when fetch from broker.
+ // If you want consumer from earliest,please use
OffsetResetStrategy.EARLIEST
+ if (offset <= 0) {
+ log.info(
+ "current consumer thread:{} has no committed
offset,use Strategy:earliest instead",
+ mq);
+ offset = consumer.minOffset(mq);
+ }
+ break;
+ case CONSUME_FROM_TIMESTAMP:
+ offset = consumer.searchOffset(mq,
consumerOffsetTimestamp);
+ break;
+ case CONSUME_FROM_SPECIFIC_OFFSETS:
+ if (specificStartupOffsets == null) {
+ throw new RuntimeException(
+ "StartMode is specific_offsets.But none
offsets has been specified");
+ }
+ Long specificOffset = specificStartupOffsets.get(mq);
+ if (specificOffset != null) {
+ offset = specificOffset;
+ } else {
+ offset = consumer.fetchConsumeOffset(mq, false);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException(
+ "current startMode is not supported" +
metadata.getStartMode());
Review Comment:
ditto
##########
seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceSplitEnumerator.java:
##########
@@ -46,67 +43,76 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
@Slf4j
public class RocketMqSourceSplitEnumerator
- implements SourceSplitEnumerator<RocketMqSourceSplit,
RocketMqSourceState> {
+ implements SourceSplitEnumerator<RocketMQPartitionSplit,
RocketMqSourceState> {
- private static final long DEFAULT_DISCOVERY_INTERVAL_MILLIS = 60 * 1000;
- private final Map<MessageQueue, RocketMqSourceSplit> assignedSplit;
private final ConsumerMetadata metadata;
- private final Context<RocketMqSourceSplit> context;
- private final Map<MessageQueue, RocketMqSourceSplit> pendingSplit;
+ private final Context<RocketMQPartitionSplit> context;
private ScheduledExecutorService executor;
private ScheduledFuture scheduledFuture;
- private final Object lock = new Object();
+ private final Map<MessageQueue, RocketMQPartitionSplit> assignedSplit;
+ private final Map<MessageQueue, RocketMQPartitionSplit> pendingSplit;
// ms
private long discoveryIntervalMillis;
+ Map<MessageQueue, Long> specificStartupOffsets;
+ private volatile boolean initialized;
+ private DefaultMQPullConsumer consumer;
+ private final Object lock = new Object();
+ /** The topic used for this RocketMQSource. */
+ private final List<String> topics;
+
+ private final long consumerOffsetTimestamp;
public RocketMqSourceSplitEnumerator(
- ConsumerMetadata metadata,
SourceSplitEnumerator.Context<RocketMqSourceSplit> context) {
+ ConsumerMetadata metadata,
+ Context<RocketMQPartitionSplit> context,
+ long discoveryIntervalMillis) {
this.metadata = metadata;
+ final RocketMqBaseConfiguration config = this.metadata.getBaseConfig();
+ this.topics = this.metadata.getTopics();
+ this.consumerOffsetTimestamp =
this.metadata.getStartOffsetsTimestamp();
this.context = context;
+ this.discoveryIntervalMillis = discoveryIntervalMillis;
this.assignedSplit = new HashMap<>();
this.pendingSplit = new HashMap<>();
+ specificStartupOffsets = this.metadata.getSpecificStartOffsets();
// Set `rocketmq.client.logUseSlf4j` to `true` to avoid create many
// `AsyncAppender-Dispatcher-Thread`
System.setProperty("rocketmq.client.logUseSlf4j", "true");
+ initialRocketMQConsumer(config);
}
public RocketMqSourceSplitEnumerator(
ConsumerMetadata metadata,
- SourceSplitEnumerator.Context<RocketMqSourceSplit> context,
+ Set<RocketMQPartitionSplit> assignedSplit,
+ Context<RocketMQPartitionSplit> context,
long discoveryIntervalMillis) {
- this(metadata, context);
- this.discoveryIntervalMillis = discoveryIntervalMillis;
- }
-
- private static int getSplitOwner(MessageQueue messageQueue, int
numReaders) {
- int startIndex = ((messageQueue.getQueueId() * 31) & 0x7FFFFFFF) %
numReaders;
- return (startIndex + messageQueue.getQueueId()) % numReaders;
+ this(metadata, context, discoveryIntervalMillis);
+ assignedSplit.forEach(split ->
this.assignedSplit.put(split.getMessageQueue(), split));
}
@Override
public void open() {
- discoveryIntervalMillis =
- discoveryIntervalMillis > 0
- ? discoveryIntervalMillis
- : DEFAULT_DISCOVERY_INTERVAL_MILLIS;
if (discoveryIntervalMillis > 0) {
this.executor =
Executors.newScheduledThreadPool(
1,
runnable -> {
Thread thread = new Thread(runnable);
thread.setDaemon(true);
-
thread.setName("RocketMq-messageQueue-dynamic-discovery");
+
thread.setName("kafka-partition-dynamic-discovery");
Review Comment:
naming errors
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]